[sink] generate unique id for writer initialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336d394e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336d394e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336d394e Branch: refs/heads/master Commit: 336d394e9cb4d68143def6027574b56f617080d2 Parents: 8e859e2 Author: Max <[email protected]> Authored: Tue Jan 19 10:38:49 2016 +0100 Committer: Davor Bonaci <[email protected]> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../flink/dataflow/translation/wrappers/SinkOutputFormat.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336d394e/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java index d87b240..b10c86f 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java @@ -23,13 +23,16 @@ import com.google.cloud.dataflow.sdk.io.Sink; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions; import com.google.cloud.dataflow.sdk.transforms.Write; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.AbstractID; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Field; +import java.util.UUID; /** * Wrapper class to use generic Write.Bound transforms as sinks. @@ -44,6 +47,8 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { private Sink.WriteOperation<T, ?> writeOperation; private Sink.Writer<T, ?> writer; + private AbstractID uid = new AbstractID(); + public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) { this.sink = extractSink(transform); this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions); @@ -80,7 +85,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { throw new IOException("Couldn't create writer.", e); } try { - writer.open(String.valueOf(taskNumber)); + writer.open(uid + "-" + String.valueOf(taskNumber)); } catch (Exception e) { throw new IOException("Couldn't open writer.", e); }
