[FLINK-8966][tests] Port AvroExternalJarProgramITCase to flip6

This closes #5766.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a8dd06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a8dd06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a8dd06

Branch: refs/heads/master
Commit: f1a8dd0654030d6b4bd79732ef1d0f32c5f6820e
Parents: 3947a39
Author: zentol <ches...@apache.org>
Authored: Tue Apr 3 11:20:12 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 4 08:59:45 2018 +0200

----------------------------------------------------------------------
 .../avro/AvroExternalJarProgramITCase.java      | 75 +++++++---------
 .../LegacyAvroExternalJarProgramITCase.java     | 92 ++++++++++++++++++++
 2 files changed, 124 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1a8dd06/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
index 985471a..6766947 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -19,74 +19,63 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.File;
-import java.net.URL;
 import java.util.Collections;
 
 /**
  * IT case for the {@link AvroExternalJarProgram}.
  */
+@Category(New.class)
 public class AvroExternalJarProgramITCase extends TestLogger {
 
        private static final String JAR_FILE = "maven-test-jar.jar";
 
        private static final String TEST_DATA_FILE = "/testdata.avro";
 
-       @Test
-       public void testExternalProgram() {
-
-               LocalFlinkMiniCluster testMiniCluster = null;
+       private static final int PARALLELISM = 4;
 
-               try {
-                       int parallelism = 4;
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
-                       testMiniCluster = new LocalFlinkMiniCluster(config, 
false);
-                       testMiniCluster.start();
+       private static final MiniCluster MINI_CLUSTER = new MiniCluster(
+               new MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(PARALLELISM)
+                       .build());
 
-                       String jarFile = JAR_FILE;
-                       String testData = 
getClass().getResource(TEST_DATA_FILE).toString();
+       @BeforeClass
+       public static void setUp() throws Exception {
+               MINI_CLUSTER.start();
+       }
 
-                       PackagedProgram program = new PackagedProgram(new 
File(jarFile), new String[] { testData });
+       @AfterClass
+       public static void tearDown() {
+               TestEnvironment.unsetAsContext();
+               MINI_CLUSTER.closeAsync();
+       }
 
-                       TestEnvironment.setAsContext(
-                               testMiniCluster,
-                               parallelism,
-                               Collections.singleton(new Path(jarFile)),
-                               Collections.<URL>emptyList());
+       @Test
+       public void testExternalProgram() throws Exception {
+               TestEnvironment.setAsContext(
+                       MINI_CLUSTER,
+                       PARALLELISM,
+                       Collections.singleton(new Path(JAR_FILE)),
+                       Collections.emptyList());
 
-                       config.setString(JobManagerOptions.ADDRESS, 
"localhost");
-                       config.setInteger(JobManagerOptions.PORT, 
testMiniCluster.getLeaderRPCPort());
+               String testData = 
getClass().getResource(TEST_DATA_FILE).toString();
 
-                       program.invokeInteractiveModeForExecution();
-               }
-               catch (Throwable t) {
-                       System.err.println(t.getMessage());
-                       t.printStackTrace();
-                       Assert.fail("Error during the packaged program 
execution: " + t.getMessage());
-               }
-               finally {
-                       TestEnvironment.unsetAsContext();
+               PackagedProgram program = new PackagedProgram(new 
File(JAR_FILE), new String[]{testData});
 
-                       if (testMiniCluster != null) {
-                               try {
-                                       testMiniCluster.stop();
-                               } catch (Throwable t) {
-                                       // ignore
-                               }
-                       }
-               }
+               program.invokeInteractiveModeForExecution();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a8dd06/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..1dd56a7
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class LegacyAvroExternalJarProgramITCase extends TestLogger {
+
+       private static final String JAR_FILE = "maven-test-jar.jar";
+
+       private static final String TEST_DATA_FILE = "/testdata.avro";
+
+       @Test
+       public void testExternalProgram() {
+
+               LocalFlinkMiniCluster testMiniCluster = null;
+
+               try {
+                       int parallelism = 4;
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+                       testMiniCluster = new LocalFlinkMiniCluster(config, 
false);
+                       testMiniCluster.start();
+
+                       String jarFile = JAR_FILE;
+                       String testData = 
getClass().getResource(TEST_DATA_FILE).toString();
+
+                       PackagedProgram program = new PackagedProgram(new 
File(jarFile), new String[] { testData });
+
+                       TestEnvironment.setAsContext(
+                               testMiniCluster,
+                               parallelism,
+                               Collections.singleton(new Path(jarFile)),
+                               Collections.<URL>emptyList());
+
+                       config.setString(JobManagerOptions.ADDRESS, 
"localhost");
+                       config.setInteger(JobManagerOptions.PORT, 
testMiniCluster.getLeaderRPCPort());
+
+                       program.invokeInteractiveModeForExecution();
+               }
+               catch (Throwable t) {
+                       System.err.println(t.getMessage());
+                       t.printStackTrace();
+                       Assert.fail("Error during the packaged program 
execution: " + t.getMessage());
+               }
+               finally {
+                       TestEnvironment.unsetAsContext();
+
+                       if (testMiniCluster != null) {
+                               try {
+                                       testMiniCluster.stop();
+                               } catch (Throwable t) {
+                                       // ignore
+                               }
+                       }
+               }
+       }
+}

Reply via email to