http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java index 2c8a257..c173389 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java @@ -15,9 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.util.serialization; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; @@ -26,34 +28,31 @@ import org.python.core.PyObject; import java.io.IOException; /** - * A serializer implementation for PyObject class type. Is is used by the Kryo serialization - * framework. {@see https://github.com/EsotericSoftware/kryo#serializers} + * A {@link Serializer} implementation for {@link PyObject} class type. */ public class PyObjectSerializer extends Serializer<PyObject> { - public void write (Kryo kryo, Output output, PyObject po) { + public void write(Kryo kryo, Output output, PyObject po) { try { byte[] serPo = SerializationUtils.serializeObject(po); output.writeInt(serPo.length); output.write(serPo); } catch (IOException e) { - e.printStackTrace(); + throw new KryoException("Failed to serialize object.", e); } } - public PyObject read (Kryo kryo, Input input, Class<PyObject> type) { + public PyObject read(Kryo kryo, Input input, Class<PyObject> type) { int len = input.readInt(); - byte[] serPo = new byte[len]; - input.read(serPo); - PyObject po = null; + byte[] serPo = input.readBytes(len); try { - po = (PyObject) SerializationUtils.deserializeObject(serPo); + return (PyObject) SerializationUtils.deserializeObject(serPo); } catch (IOException e) { - e.printStackTrace(); + throw new KryoException("Failed to deserialize object.", e); } catch (ClassNotFoundException e) { - e.printStackTrace(); + // this should only be possible if jython isn't on the class-path + throw new KryoException("Failed to deserialize object.", e); } - return po; } }
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyStringSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyStringSerializer.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyStringSerializer.java new file mode 100644 index 0000000..6988428 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyStringSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyString; + +/** + * A {@link Serializer} implementation for {@link PyString} class type. + */ +public class PyStringSerializer extends Serializer<PyString> { + @Override + public void write(Kryo kryo, Output output, PyString object) { + output.writeString(object.asString()); + } + + @Override + public PyString read(Kryo kryo, Input input, Class<PyString> type) { + return new PyString(input.readString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java deleted file mode 100644 index a51db89..0000000 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.python.util.serialization; - -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.io.IOException; - -/** - * A Python deserialization schema, which implements {@link DeserializationSchema}. It converts a - * serialized form of {@code PyObject} into its Java Object representation. - */ -public class PythonDeserializationSchema implements DeserializationSchema<Object> { - private static final long serialVersionUID = -9180596504893036458L; - private final TypeInformation<Object> resultType = TypeInformation.of(new TypeHint<Object>(){}); - - private final byte[] serSchema; - private transient DeserializationSchema<Object> schema; - - public PythonDeserializationSchema(DeserializationSchema<Object> schema) throws IOException { - this.serSchema = SerializationUtils.serializeObject(schema); - } - - @SuppressWarnings("unchecked") - public Object deserialize(byte[] message) throws IOException { - if (this.schema == null) { - try { - this.schema = (DeserializationSchema<Object>) SerializationUtils.deserializeObject(this.serSchema); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - } - return this.schema.deserialize(message); - } - - @Override - public boolean isEndOfStream(Object nextElement) { - return this.schema.isEndOfStream(nextElement); - } - - @Override - public TypeInformation<Object> getProducedType() { - return resultType; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java index a2dddaf..30f61c0 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java @@ -15,16 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.util.serialization; -import java.io.IOException; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.python.core.PyObject; +import java.io.IOException; + /** - * A Python serialization schema, which implements {@link SerializationSchema}. It converts - * a {@code PyObject} into its serialized form. + * A {@link SerializationSchema} for {@link PyObject}s. */ public class PythonSerializationSchema implements SerializationSchema<PyObject> { private static final long serialVersionUID = -9170596504893036458L; @@ -41,11 +43,9 @@ public class PythonSerializationSchema implements SerializationSchema<PyObject> public byte[] serialize(PyObject element) { if (this.schema == null) { try { - this.schema = (SerializationSchema<PyObject>)SerializationUtils.deserializeObject(this.serSchema); - } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + this.schema = SerializationUtils.deserializeObject(this.serSchema); + } catch (Exception e) { + throw new FlinkRuntimeException("Schema could not be deserialized.", e); } } return this.schema.serialize(element); http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java index b7fc2a3..eaf11ea 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java @@ -15,15 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.util.serialization; -import org.apache.flink.streaming.python.api.datastream.PythonObjectInputStream2; +import org.python.util.PythonObjectInputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; /** * A generic serialization utility, which turns java objects into their serialization form as @@ -38,9 +41,52 @@ public class SerializationUtils { } } - public static Object deserializeObject(byte[] bytes) throws IOException, ClassNotFoundException { - try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new PythonObjectInputStream2(bais)) { - return ois.readObject(); + @SuppressWarnings("unchecked") + public static <X> X deserializeObject(byte[] bytes) throws IOException, ClassNotFoundException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new SerialVersionOverridingPythonObjectInputStream(bais)) { + return (X) ois.readObject(); + } + } + + /** + * A {@link PythonObjectInputStream} that ignores SerialVersionUID mismatches. + * + * <p>Classes generated by Jython have a non-deterministic serialVersionUID, which can prevents deserialization of + * user-defined python function in a different JVM. + * + * <p>The serialVersionUID is calculated as a 64-bit hash of the class name, interface class names, methods, and fields. + * If a Python class inherits from a Java class, as in the case of Python UDFs, then a proxy wrapper class is created. + * Its name isconstructed using the following pattern: + * <b>{@code org.python.proxies.<module-name>$<UDF-class-name>$<number>}</b>. + * The {@code <number>} part is increased by one in runtime, for every job submission. This results in different IDs for + * each run for the same Python class. + */ + private static class SerialVersionOverridingPythonObjectInputStream extends PythonObjectInputStream { + + private SerialVersionOverridingPythonObjectInputStream(InputStream in) throws IOException { + super(in); + } + + protected ObjectStreamClass readClassDescriptor() throws ClassNotFoundException, IOException { + ObjectStreamClass readClassDescriptor = super.readClassDescriptor(); // initially streams descriptor + + Class<?> localClass; + try { + localClass = resolveClass(readClassDescriptor); + } catch (ClassNotFoundException e) { + return readClassDescriptor; + } + + ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass); + if (localClassDescriptor != null) { // only if class implements serializable + final long localSUID = localClassDescriptor.getSerialVersionUID(); + final long readSUID = readClassDescriptor.getSerialVersionUID(); + if (readSUID != localSUID) { + // Overriding serialized class version mismatch + readClassDescriptor = localClassDescriptor; // Use local class descriptor for deserialization + } + } + return readClassDescriptor; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java index 5e4ec51..9b3ddd3 100644 --- a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java +++ b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java @@ -15,86 +15,77 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.util.StreamingProgramTestBase; -import org.apache.flink.streaming.python.api.PythonStreamBinder; +import org.apache.flink.util.Preconditions; + +import org.python.core.PyException; -import java.io.File; -import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.List; -import java.util.ListIterator; +/** + * Tests for the {@link PythonStreamBinder}. + */ public class PythonStreamBinderTest extends StreamingProgramTestBase { - final private static String defaultPythonScriptName = "run_all_tests.py"; - final private static String flinkPythonRltvPath = "flink-libraries/flink-streaming-python"; - final private static String pathToStreamingTests = "src/test/python/org/apache/flink/streaming/python/api"; - - public PythonStreamBinderTest() { - } - public static void main(String[] args) throws Exception { - if (args.length == 0) { - args = prepareDefaultArgs(); - } else { - args[0] = findStreamTestFile(args[0]).getAbsolutePath(); - } - PythonStreamBinder.main(args); - } - - @Override - public void testProgram() throws Exception { - this.main(new String[]{}); + private static Path getBaseTestPythonDir() { + FileSystem fs = new LocalFileSystem(); + return new Path(fs.getWorkingDirectory(), "src/test/python/org/apache/flink/streaming/python/api"); } - private static String[] prepareDefaultArgs() throws Exception { - File testFullPath = findStreamTestFile(defaultPythonScriptName); - List<String> filesInTestPath = getFilesInFolder(testFullPath.getParent()); - - String[] args = new String[filesInTestPath.size() + 1]; - args[0] = testFullPath.getAbsolutePath(); - - for (final ListIterator<String> it = filesInTestPath.listIterator(); it.hasNext();) { - final String p = it.next(); - args[it.previousIndex() + 1] = p; - } - return args; + private static Path findUtilsModule() { + return new Path(getBaseTestPythonDir(), "utils"); } - private static File findStreamTestFile(String name) throws Exception { - if (new File(name).exists()) { - return new File(name); - } + private static List<String> findTestFiles() throws Exception { + List<String> files = new ArrayList<>(); FileSystem fs = FileSystem.getLocalFileSystem(); - String workingDir = fs.getWorkingDirectory().getPath(); - if (!workingDir.endsWith(flinkPythonRltvPath)) { - workingDir += File.separator + flinkPythonRltvPath; - } - FileStatus[] status = fs.listStatus( - new Path( workingDir + File.separator + pathToStreamingTests)); + FileStatus[] status = fs.listStatus(getBaseTestPythonDir()); for (FileStatus f : status) { - String file_name = f.getPath().getName(); - if (file_name.equals(name)) { - return new File(f.getPath().getPath()); + Path filePath = f.getPath(); + String fileName = filePath.getName(); + if (fileName.startsWith("test_") && fileName.endsWith(".py")) { + files.add(filePath.getPath()); } } - throw new FileNotFoundException(); + return files; } - private static List<String> getFilesInFolder(String path) { - List<String> results = new ArrayList<>(); - File[] files = new File(path).listFiles(); - if (files != null) { - for (File file : files) { - if (file.isDirectory() || file.getName().startsWith("test_")) { - results.add("." + File.separator + file.getName()); - } + @Override + public void testProgram() throws Exception { + Path testEntryPoint = new Path(getBaseTestPythonDir(), "examples/word_count.py"); + List<String> testFiles = findTestFiles(); + + Preconditions.checkState(testFiles.size() > 0, "No test files were found in {}.", getBaseTestPythonDir()); + + String[] arguments = new String[1 + 1 + testFiles.size()]; + arguments[0] = testEntryPoint.getPath(); + arguments[1] = findUtilsModule().getPath(); + int index = 2; + for (String testFile : testFiles) { + arguments[index] = testFile; + index++; + } + try { + new PythonStreamBinder(new Configuration()) + .runPlan(arguments); + } catch (PyException e) { + if (e.getCause() instanceof JobExecutionException) { + // JobExecutionExceptions are wrapped again by the jython interpreter resulting in horrible stacktraces + throw (JobExecutionException) e.getCause(); + } else { + // probably caused by some issue in the main script itself + throw e; } } - return results; } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/fibonacci.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/fibonacci.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/fibonacci.py new file mode 100644 index 0000000..972e62f --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/fibonacci.py @@ -0,0 +1,122 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# +# An example that illustrates iterations in Flink streaming. The program sums up random numbers and counts +# additions. The operation is done until it reaches a specific threshold in an iterative streaming fashion. +# + +import random +import argparse + +from org.apache.flink.api.common.functions import MapFunction +from org.apache.flink.streaming.api.collector.selector import OutputSelector +from org.apache.flink.streaming.api.functions.source import SourceFunction + +TARGET_VAL = 100 +MAX_INT_START = 50 + + +class Generator(SourceFunction): + def __init__(self, num_iters=1000): + self._running = True + self._num_iters = num_iters + + def run(self, ctx): + counter = 0 + while self._running and counter < self._num_iters: + self.do(ctx) + counter += 1 + + def do(self, ctx): + two_numbers = "{}, {}".format(random.randrange(1, MAX_INT_START), random.randrange(1, MAX_INT_START)) + ctx.collect(two_numbers) + + def cancel(self): + self._running = False + + +class Step(MapFunction): + def map(self, value): + return (value[0], value[1], value[3], value[2] + value[3], value[4] + 1) + + +class InputMap(MapFunction): + def map(self, value): + num1, num2 = value.split(",") + + num1 = int(num1.strip()) + num2 = int(num2.strip()) + + return (num1, num2, num1, num2, 0) + + +class OutPut(MapFunction): + def map(self, value): + return ((value[0], value[1]), value[4]) + + +class Selector(OutputSelector): + def select(self, value): + return ["iterate"] if value[2] < TARGET_VAL and value[3] < TARGET_VAL else ["output"] + + +class Main: + def run(self, flink, args): + env = flink.get_execution_environment() + + # create input stream of integer pairs + if args.input: + input_stream = env.read_text_file(args.input) + else: + input_stream = env.create_python_source(Generator(num_iters=50)) + + # create an iterative data stream from the input with 5 second timeout + it = input_stream\ + .map(InputMap())\ + .iterate(5000) + + # apply the step function to get the next Fibonacci number + # increment the counter and split the output with the output selector + step = it\ + .map(Step())\ + .split(Selector()) + + # close the iteration by selecting the tuples that were directed to the + # 'iterate' channel in the output selector + it.close_with(step.select("iterate")) + + # to produce the final output select the tuples directed to the + # 'output' channel then get the input pairs that have the greatest iteration counter + # on a 1 second sliding window + output = step.select("output") + parsed_output = output.map(OutPut()) + if args.output: + parsed_output.write_as_text(args.output) + else: + parsed_output.output() + result = env.execute("Fibonacci Example (py)") + print("Fibonacci job completed, job_id={}".format(result.jobID)) + + +def main(flink): + parser = argparse.ArgumentParser(description='Fibonacci.') + parser.add_argument('--input', metavar='IN', help='input file path') + parser.add_argument('--output', metavar='OUT', help='output file path') + args = parser.parse_args() + Main().run(flink, args) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py new file mode 100644 index 0000000..2254898 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py @@ -0,0 +1,122 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import argparse + +from org.apache.flink.api.common.functions import FlatMapFunction +from org.apache.flink.api.common.functions import ReduceFunction +from org.apache.flink.api.java.functions import KeySelector + +TARGET_VAL = 100 +MAX_INT_START = 50 + +default_input_data = [ + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles,", + "And by opposing end them?--To die,--to sleep,--", + "No more; and by a sleep to say we end", + "The heartache, and the thousand natural shocks", + "That flesh is heir to,--'tis a consummation", + "Devoutly to be wish'd. To die,--to sleep;--", + "To sleep! perchance to dream:--ay, there's the rub;", + "For in that sleep of death what dreams may come,", + "When we have shuffled off this mortal coil,", + "Must give us pause: there's the respect", + "That makes calamity of so long life;", + "For who would bear the whips and scorns of time,", + "The oppressor's wrong, the proud man's contumely,", + "The pangs of despis'd love, the law's delay,", + "The insolence of office, and the spurns", + "That patient merit of the unworthy takes,", + "When he himself might his quietus make", + "With a bare bodkin? who would these fardels bear,", + "To grunt and sweat under a weary life,", + "But that the dread of something after death,--", + "The undiscover'd country, from whose bourn", + "No traveller returns,--puzzles the will,", + "And makes us rather bear those ills we have", + "Than fly to others that we know not of?", + "Thus conscience does make cowards of us all;", + "And thus the native hue of resolution", + "Is sicklied o'er with the pale cast of thought;", + "And enterprises of great pith and moment,", + "With this regard, their currents turn awry,", + "And lose the name of action.--Soft you now!", + "The fair Ophelia!--Nymph, in thy orisons", + "Be all my sins remember'd." +] + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Sum(ReduceFunction): + def reduce(self, value1, value2): + return (value1[0] + value2[0], value1[1]) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Main: + def run(self, flink, args): + env = flink.get_execution_environment() + + if args.input: + text = env.read_text_file(args.input) + else: + text = env.from_collection(default_input_data) + + counts = text \ + .flat_map(Tokenizer())\ + .key_by(Selector())\ + .reduce(Sum()) + + if args.output: + counts.write_as_text(args.output) + else: + counts.output() + env.execute("Wordcount Example (py)") + +""" +Implements the "WordCount" program that computes a simple word occurrence +histogram over text files in a streaming fashion. + +The input is a plain text file with lines separated by newline characters. + +Usage: WordCount --input <path> --output <path> +If no parameters are provided, the program is run with default data. + +This example shows how to: + * write a simple Flink Streaming program, + * use tuple data types, + * write and use user-defined functions. +""" +def main(flink): + parser = argparse.ArgumentParser(description='WordCount.') + parser.add_argument('--input', metavar='IN', help='input file path') + parser.add_argument('--output', metavar='OUT', help='output file path') + args = parser.parse_args() + Main().run(flink, args) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py index 1970918..b8378cc 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py @@ -16,10 +16,9 @@ # limitations under the License. ################################################################################ import sys -from os.path import dirname, join, basename +import traceback from glob import glob -from org.apache.flink.runtime.client import JobExecutionException - +from os.path import dirname, join, basename excluded_tests = [ 'test_kafka09', @@ -30,46 +29,30 @@ class Main: def __init__(self): pass - def run(self): + def run(self, flink): tests = [] - pwd = dirname(sys.argv[0]) - print("Working directory: {}".format(pwd)) + + current_dir = dirname(sys.modules[__name__].__file__) + print("Working directory: {}".format(current_dir)) if excluded_tests: print("Excluded tests: {}\n".format(excluded_tests)) - for x in glob(join(pwd, 'test_*.py')): + for x in glob(join(current_dir, 'test_*.py')): if not x.startswith('__'): test_module_name = basename(x)[:-3] if test_module_name not in excluded_tests: tests.append(__import__(test_module_name, globals(), locals())) - failed_tests = [] - for test in tests: - print("Submitting job ... '{}'".format(test.__name__)) - try: - test.main() + try: + for test in tests: + print("Submitting job ... '{}'".format(test.__name__)) + test.main(flink) print("Job completed ('{}')\n".format(test.__name__)) - except JobExecutionException as ex: - failed_tests.append(test.__name__) - print("\n{}\n{}\n{}\n".format('#'*len(ex.message), ex.message, '#'*len(ex.message))) - except: - failed_tests.append(test.__name__) - ex_type = sys.exc_info()[0] - print("\n{}\n{}\n{}\n".format('#'*len(ex_type), ex_type, '#'*len(ex_type))) - - if failed_tests: - print("\nThe following tests were failed:") - for failed_test in failed_tests: - print("\t* " + failed_test) - raise Exception("\nFailed test(s): {}".format(failed_tests)) - else: - print("\n*** All tests passed successfully ***") - - -def main(): - Main().run() + except Exception as ex: + print ("Test {} has failed\n".format(test.__name__)) + traceback.print_exc() -if __name__ == "__main__": - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py index c32124e..6f25deb 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py @@ -15,14 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction, FilterFunction from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants from utils.pygeneratorbase import PyGeneratorBase -from utils.python_test_base import TestBase class Generator(PyGeneratorBase): @@ -58,25 +56,19 @@ class Selector(KeySelector): return input[1] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ .filter(Filterer()) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(100)) \ .reduce(Sum()) \ - .print() + .output() - env.execute(True) + env.execute() -def main(): - Main().run() +def main(flink): + Main().run(flink) -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py index 68ac3ef..e8ef68a 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py @@ -15,9 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from utils import constants -from utils.python_test_base import TestBase + from utils.pygeneratorbase import PyGeneratorBase from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector @@ -50,28 +49,19 @@ class Selector(KeySelector): return input[1] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE) \ .create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(seconds(1)) \ .reduce(Sum()) \ - .print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(str(result.jobID))) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py index 6f6db53..a550985 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py @@ -15,14 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import seconds from utils import constants from utils.pygeneratorbase import PyGeneratorBase -from utils.python_test_base import TestBase class Generator(PyGeneratorBase): @@ -51,26 +49,18 @@ class Selector(KeySelector): return input[1] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(seconds(1)) \ .reduce(Sum()) \ - .print() - - env.execute(True) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py index 9f5e7e5..e7e2a77 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py @@ -15,14 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants from utils.pygeneratorbase import PyGeneratorBase -from utils.python_test_base import TestBase class Generator(PyGeneratorBase): @@ -51,26 +49,18 @@ class Sum(ReduceFunction): return (count1 + count2, word1) -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(50)) \ .reduce(Sum()) \ - .print() - - env.execute(True) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py index 67b1a42..82cfae1 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py @@ -20,7 +20,6 @@ from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants -from utils.python_test_base import TestBase class Tokenizer(FlatMapFunction): @@ -40,28 +39,20 @@ class Selector(KeySelector): return input[1] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): +class Main: + def run(self, flink): elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ELEMENTS_IN_TEST)] - env = self._get_execution_environment() + env = flink.get_execution_environment() env.from_collection(elements) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(10)) \ .reduce(Sum()) \ - .print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(result.jobID)) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py index 59f8cf7..d4ebc2a 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py @@ -20,7 +20,6 @@ from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants -from utils.python_test_base import TestBase class Tokenizer(FlatMapFunction): @@ -40,28 +39,20 @@ class Selector(KeySelector): return input[1][0] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): +class Main: + def run(self, flink): elements = [("Alice", 111) if iii % 2 == 0 else ("Bob", 2222) for iii in range(constants.NUM_ELEMENTS_IN_TEST)] - env = self._get_execution_environment() + env = flink.get_execution_environment() env.from_elements(*elements) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(10)) \ .reduce(Sum()) \ - .print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(result.jobID)) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py index 9e615d7..328efc8 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py @@ -22,7 +22,6 @@ from org.apache.flink.streaming.python.util import PythonIterator from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants -from utils.python_test_base import TestBase class SomeIterator(PythonIterator): @@ -59,26 +58,18 @@ class Selector(KeySelector): return input[1] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.from_collection(SomeIterator(constants.NUM_ITERATIONS_IN_TEST)) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(10)) \ .reduce(Sum()) \ - .print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(result.jobID)) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py index 2e24464..3edc7b8 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py @@ -20,7 +20,6 @@ from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants -from utils.python_test_base import TestBase class Tokenizer(FlatMapFunction): @@ -40,26 +39,18 @@ class Selector(KeySelector): return 1 -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.generate_sequence(1, constants.NUM_ITERATIONS_IN_TEST) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(10)) \ .reduce(Sum()) \ - .print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(result.jobID)) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py index eea8d5c..6756508 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py @@ -19,7 +19,6 @@ from org.apache.flink.api.common.functions import FilterFunction from org.apache.flink.api.common.functions import MapFunction from utils import constants -from utils.python_test_base import TestBase class MinusOne(MapFunction): @@ -37,12 +36,9 @@ class LessEquelToZero(FilterFunction): return value <= 0 -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() some_integers = env.from_collection([2] * 5) iterative_stream = some_integers.iterate(constants.MAX_EXECUTION_TIME_MS) @@ -55,15 +51,10 @@ class Main(TestBase): less_then_zero = minus_one_stream.filter(LessEquelToZero()) - less_then_zero.print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(result.jobID)) - + less_then_zero.output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py deleted file mode 100644 index 1be7c17..0000000 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py +++ /dev/null @@ -1,157 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -import sys -import threading -from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction -from org.apache.flink.api.java.functions import KeySelector -from org.apache.flink.api.java.utils import ParameterTool -from org.apache.flink.streaming.python.connectors import PythonFlinkKafkaProducer09, PythonFlinkKafkaConsumer09 -from org.apache.flink.streaming.api.functions.source import SourceFunction -from org.apache.flink.streaming.api.windowing.time.Time import milliseconds -from org.apache.flink.streaming.util.serialization import DeserializationSchema -from org.apache.flink.streaming.util.serialization import SerializationSchema - -from utils import constants -from utils import utils -from utils.python_test_base import TestBase - -KAFKA_DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092" - - -class StringGenerator(SourceFunction): - def __init__(self, msg, end_msg, num_iters=7000): - self._running = True - self._msg = msg - self._end_msg = end_msg - self._num_iters = num_iters - def run(self, ctx): - counter = 0 - while self._running and counter < self._num_iters - 1: - counter += 1 - ctx.collect(self._msg) - ctx.collect(self._end_msg) - def cancel(self): - self._running = False - - -class ToStringSchema(SerializationSchema): - def serialize(self, value): - return str(value) - - -class KafkaStringProducer(threading.Thread, TestBase): - def __init__(self, bootstrap_server, msg, end_msg, num_iters): - threading.Thread.__init__(self) - self._bootstrap_server = bootstrap_server - self._msg = msg - self._end_msg = end_msg - # if self._msg[-1] != '\n': self._msg += '\n' - # if self._end_msg[-1] != '\n': self._end_msg += '\n' - self._num_iters = num_iters - - def run(self): - env = self._get_execution_environment() - - stream = env.create_python_source(StringGenerator(self._msg, self._end_msg, num_iters=100)) - - producer = PythonFlinkKafkaProducer09(KAFKA_DEFAULT_BOOTSTRAP_SERVERS, "kafka09-test", ToStringSchema()) - producer.set_log_failures_only(False); # "False" by default - producer.set_flush_on_checkpoint(True); # "True" by default - - stream.add_sink(producer) - - result = env.execute("Kafka09 producer test") - print("Kafka09 producer job completed, job_id={}".format(result.jobID)) - - -class StringDeserializationSchema(DeserializationSchema): - def deserialize(self, message): - return ''.join(map(chr,message)) - - def isEndOfStream(self, element): - return str(element) == "quit" - - -class Tokenizer(FlatMapFunction): - def flatMap(self, value, collector): - for word in value.lower().split(): - collector.collect((1, word)) - - -class Sum(ReduceFunction): - def reduce(self, input1, input2): - count1, val1 = input1 - count2, val2 = input2 - return (count1 + count2, val1) - - -class Selector(KeySelector): - def getKey(self, input): - return input[1] - - -class KafkaStringConsumer(threading.Thread, TestBase): - def __init__(self, bootstrap_server): - threading.Thread.__init__(self) - self._bootstrap_server = bootstrap_server - - def run(self): - parameterTool = ParameterTool.fromArgs(sys.argv[1:]) - props = parameterTool.getProperties() - props.setProperty("bootstrap.servers", self._bootstrap_server) - - consumer = PythonFlinkKafkaConsumer09("kafka09-test", StringDeserializationSchema(), props) - - env = self._get_execution_environment() - env.add_java_source(consumer) \ - .flat_map(Tokenizer()) \ - .key_by(Selector()) \ - .time_window(milliseconds(100)) \ - .reduce(Sum()) \ - .print() - - result = env.execute("Python consumer kafka09 test", True) - print("Kafka09 consumer job completed, job_id={}".format(result.jobID)) - - -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - host, port = KAFKA_DEFAULT_BOOTSTRAP_SERVERS.split(":") - if not utils.is_reachable(host, int(port)): - print("Kafka server is not reachable: [{}]".format(KAFKA_DEFAULT_BOOTSTRAP_SERVERS)) - return - - kafka_p = KafkaStringProducer(KAFKA_DEFAULT_BOOTSTRAP_SERVERS, "Hello World", "quit", constants.NUM_ITERATIONS_IN_TEST) - kafka_c = KafkaStringConsumer(KAFKA_DEFAULT_BOOTSTRAP_SERVERS) - - kafka_p.start() - kafka_c.start() - - kafka_p.join() - kafka_c.join() - - -def main(): - Main().run() - - -if __name__ == '__main__': - main() http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py index 07a60c0..fd5f25d 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py @@ -15,12 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from org.apache.flink.api.common.functions import ReduceFunction, FlatMapFunction from org.apache.flink.api.java.functions import KeySelector from utils import constants -from utils.python_test_base import TestBase class Tokenizer(FlatMapFunction): @@ -41,29 +39,20 @@ class Sum(ReduceFunction): return (count1 + count2, val1) -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): +class Main: + def run(self, flink): elements = [(1, 222 if x % 2 == 0 else 333) for x in range(constants.NUM_ELEMENTS_IN_TEST)] - env = self._get_execution_environment() + env = flink.get_execution_environment() env.set_parallelism(2) \ .from_elements(elements) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .reduce(Sum()) \ - .print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(str(result.jobID))) - + .output() -def main(): - Main().run() + result = env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py index 43984d0..1e97d05 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py @@ -15,8 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys -from utils.python_test_base import TestBase from utils.pygeneratorbase import PyGeneratorBase from org.apache.flink.api.common.functions import MapFunction, FlatMapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector @@ -55,12 +53,9 @@ class Selector(KeySelector): return input[1] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.from_collection([3] * 5) \ .map(DummyTupple()) \ .map(MinusOne()) \ @@ -68,16 +63,10 @@ class Main(TestBase): .key_by(Selector()) \ .time_window(milliseconds(5)) \ .reduce(Sum()) \ - .print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(str(result.jobID))) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py index 44e7bfc..c2b0b5b 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py @@ -15,14 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from org.apache.flink.api.common.functions import MapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import seconds from utils import constants from utils.pygeneratorbase import PyGeneratorBase -from utils.python_test_base import TestBase class Generator(PyGeneratorBase): @@ -52,26 +50,18 @@ class Sum(ReduceFunction): return (count1 + count2, val1) -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ .map(Tokenizer()) \ .key_by(Selector()) \ .time_window(seconds(1)) \ .reduce(Sum()) \ - .print() - - env.execute(True) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py index 36c86c9..76d6c38 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py @@ -17,14 +17,12 @@ ################################################################################ import os import re -import sys import uuid from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants -from utils.python_test_base import TestBase class Tokenizer(FlatMapFunction): @@ -52,31 +50,23 @@ def generate_tmp_text_file(num_lines=100): return tmp_f -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): +class Main: + def run(self, flink): tmp_f = generate_tmp_text_file(constants.NUM_ELEMENTS_IN_TEST) try: - env = self._get_execution_environment() + env = flink.get_execution_environment() env.read_text_file(tmp_f.name) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(100)) \ .reduce(Sum()) \ - .print() + .output() - env.execute(True) + env.execute() finally: tmp_f.close() os.unlink(tmp_f.name) -def main(): - Main().run() - - -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py index 726d69f..54a1be4 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py @@ -16,16 +16,13 @@ # limitations under the License. ################################################################################ import socket -import sys import threading import time from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import seconds -from utils import constants -from utils import utils -from utils.python_test_base import TestBase +from utils import constants, utils class SocketStringGenerator(threading.Thread): @@ -66,30 +63,22 @@ class Selector(KeySelector): def getKey(self, input): return input[1] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): +class Main: + def run(self, flink): f_port = utils.gen_free_port() SocketStringGenerator(host='', port=f_port, msg='Hello World', num_iters=constants.NUM_ITERATIONS_IN_TEST).start() time.sleep(0.5) - env = self._get_execution_environment() + env = flink.get_execution_environment() env.socket_text_stream('localhost', f_port) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(seconds(1)) \ .reduce(Sum()) \ - .print() - - env.execute(True) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py index 9052652..7461c72 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py @@ -21,7 +21,6 @@ from org.apache.flink.streaming.api.collector.selector import OutputSelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants -from utils.python_test_base import TestBase class StreamSelector(OutputSelector): @@ -45,12 +44,9 @@ class Selector(KeySelector): def getKey(self, input): return 1 -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() split_window = env.generate_sequence(1, constants.NUM_ITERATIONS_IN_TEST).split(StreamSelector()) @@ -59,22 +55,17 @@ class Main(TestBase): .key_by(Selector()) \ .time_window(milliseconds(10)) \ .reduce(Sum()) \ - .print() + .output() split_window.select('upper_stream') \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(10)) \ .reduce(Sum()) \ - .print() - - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(result.jobID)) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py index d70f87c..e2ed671 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py @@ -21,7 +21,6 @@ from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants from utils.pygeneratorbase import PyGeneratorBase -from utils.python_test_base import TestBase class Generator(PyGeneratorBase): @@ -48,12 +47,9 @@ class Selector(KeySelector): def getKey(self, input): return 1 -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() seq1 = env.create_python_source(Generator(msg='Hello', num_iters=constants.NUM_ITERATIONS_IN_TEST)) seq2 = env.create_python_source(Generator(msg='World', num_iters=constants.NUM_ITERATIONS_IN_TEST)) seq3 = env.create_python_source(Generator(msg='Happy', num_iters=constants.NUM_ITERATIONS_IN_TEST)) @@ -63,15 +59,10 @@ class Main(TestBase): .key_by(Selector()) \ .time_window(milliseconds(10)) \ .reduce(Sum()) \ - .print() - - result = env.execute("My python union stream test", True) - print("Job completed, job_id={}".format(result.jobID)) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py index 3ace000..1d409b5 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py @@ -15,14 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from org.apache.flink.api.common.functions import MapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants from utils.pygeneratorbase import PyGeneratorBase -from utils.python_test_base import TestBase class Person: @@ -61,26 +59,18 @@ class Sum(ReduceFunction): return (count1 + count2, val1) -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ .map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(30)) \ .reduce(Sum()) \ - .print() - - env.execute(True) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py index 8a651df..98e22c5 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py @@ -15,14 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.functions.windowing import WindowFunction from org.apache.flink.streaming.api.windowing.time.Time import seconds from utils import constants from utils.pygeneratorbase import PyGeneratorBase -from utils.python_test_base import TestBase class Generator(PyGeneratorBase): @@ -48,25 +46,17 @@ class WindowSum(WindowFunction): collector.collect((key, len(values))) -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): - env = self._get_execution_environment() +class Main: + def run(self, flink): + env = flink.get_execution_environment() env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ .key_by(Selector()) \ .time_window(seconds(1)) \ .apply(WindowSum()) \ - .print() - - env.execute(True) - + .output() -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py index baad7b3..82ace05 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py @@ -15,10 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import sys from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector -from org.apache.flink.streaming.python.api.environment import PythonStreamExecutionEnvironment from org.apache.flink.streaming.api.functions.source import SourceFunction from org.apache.flink.streaming.api.windowing.time.Time import milliseconds @@ -57,28 +55,17 @@ class Sum(ReduceFunction): class Main: - def __init__(self, local): - self._local = local - - def run(self): - env = PythonStreamExecutionEnvironment.get_execution_environment() + def run(self, flink): + env = flink.get_execution_environment() env.create_python_source(Generator(num_iters=100)) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(30)) \ .reduce(Sum()) \ - .print() - - - print("Execution mode: {}".format("LOCAL" if self._local else "REMOTE")) - env.execute(self._local) - + .output() -def main(): - local = False if len(sys.argv) > 1 and sys.argv[1] == "remote" else True - Main(local).run() + env.execute() -if __name__ == '__main__': - main() - print("Job completed ({})\n".format(sys.argv)) +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py index 9a34d06..1694c05 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py @@ -21,7 +21,6 @@ from org.apache.flink.core.fs.FileSystem import WriteMode from org.apache.flink.streaming.api.windowing.time.Time import milliseconds from utils import constants -from utils.python_test_base import TestBase class Tokenizer(FlatMapFunction): @@ -41,14 +40,11 @@ class Selector(KeySelector): return input[1] -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - def run(self): +class Main: + def run(self, flink): elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ITERATIONS_IN_TEST)] - env = self._get_execution_environment() + env = flink.get_execution_environment() env.from_collection(elements) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ @@ -56,13 +52,8 @@ class Main(TestBase): .reduce(Sum()) \ .write_as_text("/tmp/flink_write_as_text", WriteMode.OVERWRITE) - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(result.jobID)) - - -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py index aa322ce..9e8af9d 100644 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py @@ -25,7 +25,6 @@ from org.apache.flink.streaming.util.serialization import SerializationSchema from utils import constants from utils import utils -from utils.python_test_base import TestBase class Tokenizer(FlatMapFunction): @@ -74,19 +73,15 @@ class SocketStringReader(threading.Thread): serversocket.close() -class Main(TestBase): - def __init__(self): - super(Main, self).__init__() - - - def run(self): +class Main: + def run(self, flink): port = utils.gen_free_port() SocketStringReader('', port, constants.NUM_ITERATIONS_IN_TEST).start() time.sleep(0.5) elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ITERATIONS_IN_TEST)] - env = self._get_execution_environment() + env = flink.get_execution_environment() env.from_collection(elements) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ @@ -94,13 +89,8 @@ class Main(TestBase): .reduce(Sum()) \ .write_to_socket('localhost', port, ToStringSchema()) - result = env.execute("MyJob", True) - print("Job completed, job_id={}".format(result.jobID)) - - -def main(): - Main().run() + env.execute() -if __name__ == '__main__': - main() +def main(flink): + Main().run(flink) http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py deleted file mode 100644 index 7baef33..0000000 --- a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py +++ /dev/null @@ -1,35 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -import sys -from org.apache.flink.api.java.utils import ParameterTool -from org.apache.flink.streaming.python.api.environment import PythonStreamExecutionEnvironment - - -class TestBase(object): - _params = ParameterTool.fromArgs(sys.argv[1:]) if len(sys.argv[1:]) > 0 else None - - def __init__(self): - pass - - def _get_execution_environment(self): - if TestBase._params: - print("Create local execution environment with provided configurations") - return PythonStreamExecutionEnvironment.create_local_execution_environment(TestBase._params.getConfiguration()) - else: - print("Get execution environment") - return PythonStreamExecutionEnvironment.get_execution_environment() http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/resources/log4j-test.properties b/flink-libraries/flink-streaming-python/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2226f68 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
