[cleanup] remove obsolete code
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0cb5f07 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0cb5f07 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0cb5f07 Branch: refs/heads/master Commit: f0cb5f07361f6e6eca30fa66a1d80d205ee7d2b8 Parents: 602d8fe Author: Max <[email protected]> Authored: Wed Feb 17 13:23:26 2016 +0100 Committer: Davor Bonaci <[email protected]> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../translation/FlinkBatchTransformTranslators.java | 3 ++- .../translation/wrappers/SourceInputFormat.java | 3 +-- .../wrappers/streaming/FlinkAbstractParDoWrapper.java | 14 -------------- .../flink/dataflow/JoinExamplesITCase.java | 7 ------- 4 files changed, 3 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java index 9a43d05..d5c09b2 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java @@ -151,7 +151,8 @@ public class FlinkBatchTransformTranslators { TypeInformation<T> typeInformation = context.getTypeInfo(output); - DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(), new SourceInputFormat<>(source, context.getPipelineOptions(), coder), typeInformation, name); + DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(), + new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name); context.setOutputDataSet(output, dataSource); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java index b3eca96..64dc072 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java @@ -49,10 +49,9 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> private BoundedSource.BoundedReader<T> reader = null; private boolean reachedEnd = true; - public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options, Coder<T> coder) { + public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) { this.initialSource = initialSource; this.options = options; - Coder<T> coder1 = coder; } private void writeObject(ObjectOutputStream out) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index 53bb177..71f9c7f 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -55,20 +55,6 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl this.windowingStrategy = windowingStrategy; } -// protected void writeObject(ObjectOutputStream out) -// throws IOException, ClassNotFoundException { -// out.defaultWriteObject(); -// ObjectMapper mapper = new ObjectMapper(); -// mapper.writeValue(out, options); -// } -// -// protected void readObject(ObjectInputStream in) -// throws IOException, ClassNotFoundException { -// in.defaultReadObject(); -// ObjectMapper mapper = new ObjectMapper(); -// options = mapper.readValue(in, PipelineOptions.class); -// } - private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { if (this.context == null) { this.context = new DoFnProcessContext(function, outCollector); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java index dfcadc1..ed2ecf5 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java @@ -53,13 +53,6 @@ public class JoinExamplesITCase extends JavaProgramTestBase { }; static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS); - private static final KV<String, String> kv1 = KV.of("VM", - "Date: 20141212, Actor1: LAOS, url: http://www.chicagotribune.com"); - private static final KV<String, String> kv2 = KV.of("BE", - "Date: 20141213, Actor1: AFGHANISTAN, url: http://cnn.com"); - private static final KV<String, String> kv3 = KV.of("BE", "Belgium"); - private static final KV<String, String> kv4 = KV.of("VM", "Vietnam"); - private static final TableRow cc1 = new TableRow() .set("FIPSCC", "VM").set("HumanName", "Vietnam"); private static final TableRow cc2 = new TableRow()
