[BEAM-1886] Remove TextIO override in Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c030d49 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c030d49 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c030d49 Branch: refs/heads/master Commit: 0c030d49a79ce20e25a8af3110442130dab8899c Parents: 4121ec4 Author: JingsongLi <[email protected]> Authored: Fri Apr 14 11:22:43 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Apr 20 18:01:19 2017 +0200 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 55 -------------------- .../flink/streaming/GroupByNullKeyTest.java | 11 ++-- 2 files changed, 8 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0c030d49/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 123d5e7..71f315d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -50,7 +50,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -80,10 +79,8 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -92,8 +89,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This class contains all the mappings between Beam and Flink @@ -116,7 +111,6 @@ class FlinkStreamingTransformTranslators { static { TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator()); TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); - TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator()); TRANSLATORS.put( @@ -145,55 +139,6 @@ class FlinkStreamingTransformTranslators { // Transformation Implementations // -------------------------------------------------------------------------------------------- - private static class TextIOWriteBoundStreamingTranslator - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> { - - private static final Logger LOG = - LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); - - @Override - public void translateNode( - TextIO.Write.Bound transform, - FlinkStreamingTranslationContext context) { - PValue input = context.getInput(transform); - DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input); - - String filenamePrefix = transform.getFilenamePrefix(); - String filenameSuffix = transform.getFilenameSuffix(); - boolean needsValidation = transform.needsValidation(); - int numShards = transform.getNumShards(); - String shardNameTemplate = transform.getShardNameTemplate(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn( - "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", - needsValidation); - LOG.warn( - "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", - filenameSuffix); - LOG.warn( - "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", - shardNameTemplate); - - DataStream<String> dataSink = inputDataStream - .flatMap(new FlatMapFunction<WindowedValue<String>, String>() { - @Override - public void flatMap( - WindowedValue<String> value, - Collector<String> out) - throws Exception { - out.collect(value.getValue()); - } - }); - DataStreamSink<String> output = - dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); - - if (numShards > 0) { - output.setParallelism(numShards); - } - } - } - private static class UnboundedReadSourceTranslator<T> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/0c030d49/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java index 663b910..82d9f4f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink.streaming; import com.google.common.base.Joiner; +import java.io.File; import java.io.Serializable; import java.util.Arrays; import org.apache.beam.runners.flink.FlinkTestPipeline; @@ -41,7 +42,7 @@ import org.joda.time.Instant; */ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable { - + protected String resultDir; protected String resultPath; static final String[] EXPECTED_RESULT = new String[] { @@ -53,12 +54,16 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri @Override protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + // Beam Write will add shard suffix to fileName, see ShardNameTemplate. + // So tempFile need have a parent to compare. + File resultParent = createAndRegisterTempFile("result"); + resultDir = resultParent.toURI().toString(); + resultPath = new File(resultParent, "file.txt").getAbsolutePath(); } @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir); } /**
