[sink] generate unique id for writer initialization

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336d394e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336d394e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336d394e

Branch: refs/heads/master
Commit: 336d394e9cb4d68143def6027574b56f617080d2
Parents: 8e859e2
Author: Max <[email protected]>
Authored: Tue Jan 19 10:38:49 2016 +0100
Committer: Davor Bonaci <[email protected]>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../flink/dataflow/translation/wrappers/SinkOutputFormat.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336d394e/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
index d87b240..b10c86f 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
@@ -23,13 +23,16 @@ import com.google.cloud.dataflow.sdk.io.Sink;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import 
com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
 import com.google.cloud.dataflow.sdk.transforms.Write;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
+import java.util.UUID;
 
 /**
  * Wrapper class to use generic Write.Bound transforms as sinks.
@@ -44,6 +47,8 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
        private Sink.WriteOperation<T, ?> writeOperation;
        private Sink.Writer<T, ?> writer;
 
+       private AbstractID uid = new AbstractID();
+
        public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions 
pipelineOptions) {
                this.sink = extractSink(transform);
                this.pipelineOptions = 
Preconditions.checkNotNull(pipelineOptions);
@@ -80,7 +85,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
                        throw new IOException("Couldn't create writer.", e);
                }
                try {
-                       writer.open(String.valueOf(taskNumber));
+                       writer.open(uid + "-" + String.valueOf(taskNumber));
                } catch (Exception e) {
                        throw new IOException("Couldn't open writer.", e);
                }

Reply via email to