Repository: beam Updated Branches: refs/heads/master 8af5c28d4 -> defb55405
http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index c16eea2..fe96e87 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -71,6 +71,8 @@ import org.mockito.MockitoAnnotations; public class DoFnInvokersTest { @Rule public ExpectedException thrown = ExpectedException.none(); + @Mock private DoFn<String, String>.StartBundleContext mockStartBundleContext; + @Mock private DoFn<String, String>.FinishBundleContext mockFinishBundleContext; @Mock private DoFn<String, String>.ProcessContext mockProcessContext; @Mock private IntervalWindow mockWindow; @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider; @@ -79,6 +81,10 @@ public class DoFnInvokersTest { public void setUp() { MockitoAnnotations.initMocks(this); when(mockArgumentProvider.window()).thenReturn(mockWindow); + when(mockArgumentProvider.startBundleContext(Matchers.<DoFn>any())) + .thenReturn(mockStartBundleContext); + when(mockArgumentProvider.finishBundleContext(Matchers.<DoFn>any())) + .thenReturn(mockFinishBundleContext); when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext); } @@ -233,10 +239,10 @@ public class DoFnInvokersTest { public void processElement(ProcessContext c) {} @StartBundle - public void startBundle(Context c) {} + public void startBundle(StartBundleContext c) {} @FinishBundle - public void finishBundle(Context c) {} + public void finishBundle(FinishBundleContext c) {} @Setup public void before() {} @@ -247,12 +253,12 @@ public class DoFnInvokersTest { MockFn fn = mock(MockFn.class); DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn); invoker.invokeSetup(); - invoker.invokeStartBundle(mockProcessContext); - invoker.invokeFinishBundle(mockProcessContext); + invoker.invokeStartBundle(mockStartBundleContext); + invoker.invokeFinishBundle(mockFinishBundleContext); invoker.invokeTeardown(); verify(fn).before(); - verify(fn).startBundle(mockProcessContext); - verify(fn).finishBundle(mockProcessContext); + verify(fn).startBundle(mockStartBundleContext); + verify(fn).finishBundle(mockFinishBundleContext); verify(fn).after(); } @@ -601,7 +607,7 @@ public class DoFnInvokersTest { DoFnInvokers.invokerFor( new DoFn<Integer, Integer>() { @StartBundle - public void startBundle(@SuppressWarnings("unused") Context c) { + public void startBundle(@SuppressWarnings("unused") StartBundleContext c) { throw new IllegalArgumentException("bogus"); } @@ -619,7 +625,7 @@ public class DoFnInvokersTest { DoFnInvokers.invokerFor( new DoFn<Integer, Integer>() { @FinishBundle - public void finishBundle(@SuppressWarnings("unused") Context c) { + public void finishBundle(@SuppressWarnings("unused") FinishBundleContext c) { throw new IllegalArgumentException("bogus"); } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index d6cc4f6..f099d5d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -76,13 +76,14 @@ public class DoFnSignaturesTest { @Test public void testBadExtraContext() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Must take a single argument of type DoFn<Integer, String>.Context"); + thrown.expectMessage( + "Must take a single argument of type DoFn<Integer, String>.StartBundleContext"); - DoFnSignatures.analyzeBundleMethod( + DoFnSignatures.analyzeStartBundleMethod( errors(), TypeDescriptor.of(FakeDoFn.class), new DoFnSignaturesTestUtils.AnonymousMethod() { - void method(DoFn<Integer, String>.Context c, int n) {} + void method(DoFn<Integer, String>.StartBundleContext c, int n) {} }.getMethod(), TypeDescriptor.of(Integer.class), TypeDescriptor.of(String.class)); @@ -112,8 +113,8 @@ public class DoFnSignaturesTest { public void testMultipleFinishBundleMethods() throws Exception { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Found multiple methods annotated with @FinishBundle"); - thrown.expectMessage("bar(Context)"); - thrown.expectMessage("baz(Context)"); + thrown.expectMessage("bar(FinishBundleContext)"); + thrown.expectMessage("baz(FinishBundleContext)"); thrown.expectMessage(getClass().getName() + "$"); DoFnSignatures.getSignature( new DoFn<String, String>() { @@ -121,10 +122,10 @@ public class DoFnSignaturesTest { public void foo(ProcessContext context) {} @FinishBundle - public void bar(Context context) {} + public void bar(FinishBundleContext context) {} @FinishBundle - public void baz(Context context) {} + public void baz(FinishBundleContext context) {} }.getClass()); } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 1cdd087..6d5e230 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -71,6 +71,7 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; @@ -304,20 +305,21 @@ public class ProcessBundleHandlerTest { private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput"); private static final TupleTag<String> additionalOutput = new TupleTag<>("output"); - @StartBundle - public void startBundle(Context context) { - context.output("StartBundle"); - } + private BoundedWindow window; @ProcessElement - public void processElement(ProcessContext context) { + public void processElement(ProcessContext context, BoundedWindow window) { context.output("MainOutput" + context.element()); context.output(additionalOutput, "AdditionalOutput" + context.element()); + this.window = window; } @FinishBundle - public void finishBundle(Context context) { - context.output("FinishBundle"); + public void finishBundle(FinishBundleContext context) { + if (window != null) { + context.output("FinishBundle", window.maxTimestamp(), window); + window = null; + } } } @@ -411,7 +413,6 @@ public class ProcessBundleHandlerTest { finishFunctions::add); Iterables.getOnlyElement(startFunctions).run(); - assertThat(mainOutputValues, contains(valueInGlobalWindow("StartBundle"))); mainOutputValues.clear(); assertEquals(newConsumers.keySet(), http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 0a3b900..f6ceef2 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; - import java.io.IOException; import java.io.Serializable; import java.net.MalformedURLException; @@ -40,7 +39,6 @@ import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; import javax.annotation.Nullable; - import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -757,7 +755,7 @@ public class ElasticsearchIO { } @StartBundle - public void startBundle(Context context) throws Exception { + public void startBundle(StartBundleContext context) throws Exception { batch = new ArrayList<>(); currentBatchSizeBytes = 0; } @@ -769,12 +767,16 @@ public class ElasticsearchIO { currentBatchSizeBytes += document.getBytes().length; if (batch.size() >= spec.getMaxBatchSize() || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) { - finishBundle(context); + flushBatch(); } } @FinishBundle - public void finishBundle(Context context) throws Exception { + public void finishBundle(FinishBundleContext context) throws Exception { + flushBatch(); + } + + private void flushBatch() throws IOException { if (batch.isEmpty()) { return; } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java index fd5f396..f267976 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java @@ -56,7 +56,7 @@ class StreamingWriteFn /** Prepares a target BigQuery table. */ @StartBundle - public void startBundle(Context context) { + public void startBundle() { tableRows = new HashMap<>(); uniqueIdsForTableRows = new HashMap<>(); } @@ -75,7 +75,7 @@ class StreamingWriteFn /** Writes the accumulated rows into BigQuery with streaming API. */ @FinishBundle - public void finishBundle(Context context) throws Exception { + public void finishBundle(FinishBundleContext context) throws Exception { BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) { TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey()); http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java index 284691e..cd88222 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java @@ -40,7 +40,7 @@ class TagWithUniqueIds private transient long sequenceNo = 0L; @StartBundle - public void startBundle(Context context) { + public void startBundle() { randomUUID = UUID.randomUUID().toString(); } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index e90b974..70aa135 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +49,10 @@ class WriteBundlesToFiles<DestinationT> // Map from tablespec to a writer for that table. private transient Map<DestinationT, TableRowWriter> writers; + private transient Map<DestinationT, BoundedWindow> writerWindows; private final String stepUuid; + /** * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, * and encapsulates the table it is destined to as well as the file byte size. @@ -110,14 +113,15 @@ class WriteBundlesToFiles<DestinationT> } @StartBundle - public void startBundle(Context c) { + public void startBundle() { // This must be done each bundle, as by default the {@link DoFn} might be reused between // bundles. this.writers = Maps.newHashMap(); + this.writerWindows = Maps.newHashMap(); } @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { String tempFilePrefix = resolveTempLocation( c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid); TableRowWriter writer = writers.get(c.element().getKey()); @@ -125,6 +129,7 @@ class WriteBundlesToFiles<DestinationT> writer = new TableRowWriter(tempFilePrefix); writer.open(UUID.randomUUID().toString()); writers.put(c.element().getKey(), writer); + writerWindows.put(c.element().getKey(), window); LOG.debug("Done opening writer {}", writer); } try { @@ -143,11 +148,15 @@ class WriteBundlesToFiles<DestinationT> } @FinishBundle - public void finishBundle(Context c) throws Exception { + public void finishBundle(FinishBundleContext c) throws Exception { for (Map.Entry<DestinationT, TableRowWriter> entry : writers.entrySet()) { TableRowWriter.Result result = entry.getValue().close(); - c.output(new Result<>(result.resourceId.toString(), result.byteSize, entry.getKey())); + c.output( + new Result<>(result.resourceId.toString(), result.byteSize, entry.getKey()), + writerWindows.get(entry.getKey()).maxTimestamp(), + writerWindows.get(entry.getKey())); } writers.clear(); + writerWindows.clear(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 69fac68..0e97c12 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -572,7 +572,7 @@ public class BigtableIO { } @StartBundle - public void startBundle(Context c) throws IOException { + public void startBundle(StartBundleContext c) throws IOException { if (bigtableWriter == null) { bigtableWriter = bigtableServiceFactory.apply( c.getPipelineOptions()).openForWriting(tableId); @@ -589,7 +589,7 @@ public class BigtableIO { } @FinishBundle - public void finishBundle(Context c) throws Exception { + public void finishBundle() throws Exception { bigtableWriter.flush(); checkForFailures(); LOG.info("Wrote {} records", recordsWritten); http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index f619429..fd4fccf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -704,7 +704,7 @@ public class DatastoreV1 { } @StartBundle - public void startBundle(Context c) throws Exception { + public void startBundle(StartBundleContext c) throws Exception { datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), v1Options.getProjectId()); } @@ -748,7 +748,7 @@ public class DatastoreV1 { } @StartBundle - public void startBundle(Context c) throws Exception { + public void startBundle(StartBundleContext c) throws Exception { datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(), options.getLocalhost()); querySplitter = datastoreFactory.getQuerySplitter(); @@ -821,7 +821,7 @@ public class DatastoreV1 { } @StartBundle - public void startBundle(Context c) throws Exception { + public void startBundle(StartBundleContext c) throws Exception { datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(), options.getLocalhost()); } @@ -1145,7 +1145,7 @@ public class DatastoreV1 { } @StartBundle - public void startBundle(Context c) { + public void startBundle(StartBundleContext c) { datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId.get(), localhost); } @@ -1158,7 +1158,7 @@ public class DatastoreV1 { } @FinishBundle - public void finishBundle(Context c) throws Exception { + public void finishBundle() throws Exception { if (!mutations.isEmpty()) { flushBatch(); } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index e023ad0..fa2d20f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -888,7 +888,7 @@ public class PubsubIO { private transient PubsubClient pubsubClient; @StartBundle - public void startBundle(Context c) throws IOException { + public void startBundle(StartBundleContext c) throws IOException { this.output = new ArrayList<>(); // NOTE: idAttribute is ignored. this.pubsubClient = @@ -911,7 +911,7 @@ public class PubsubIO { } @FinishBundle - public void finishBundle(Context c) throws IOException { + public void finishBundle() throws IOException { if (!output.isEmpty()) { publish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 9d97e91..031d9a0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -255,7 +255,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, } @StartBundle - public void startBundle(Context c) throws Exception { + public void startBundle(StartBundleContext c) throws Exception { checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); pubsubClient = pubsubFactory.newClient(timestampAttribute, idAttribute, c.getPipelineOptions().as(PubsubOptions.class)); @@ -287,7 +287,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, } @FinishBundle - public void finishBundle(Context c) throws Exception { + public void finishBundle() throws Exception { pubsubClient.close(); pubsubClient = null; } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java index 86a9246..ef6556e 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java @@ -223,6 +223,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { // Writer that will write the records in this bundle. Lazily // initialized in processElement. private Writer<T, WriteT> writer = null; + private BoundedWindow window; private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) { @@ -243,6 +244,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { } else { writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); } + this.window = window; LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); } try { @@ -265,12 +267,13 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { } @FinishBundle - public void finishBundle(Context c) throws Exception { + public void finishBundle(FinishBundleContext c) throws Exception { if (writer != null) { WriteT result = writer.close(); - c.output(result); + c.output(result, window.maxTimestamp(), window); // Reset state in case of reuse. writer = null; + window = null; } } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index eee8927..3c42da9 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -634,11 +634,6 @@ public class HBaseIO { recordsWritten = 0; } - @StartBundle - public void startBundle(Context c) throws Exception { - - } - @ProcessElement public void processElement(ProcessContext ctx) throws Exception { KV<byte[], Iterable<Mutation>> record = ctx.element(); @@ -651,7 +646,7 @@ public class HBaseIO { } @FinishBundle - public void finishBundle(Context c) throws Exception { + public void finishBundle() throws Exception { mutator.flush(); } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 2d48236..2eb53dd 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -25,6 +25,7 @@ import java.io.Serializable; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Random; import javax.annotation.Nullable; @@ -480,7 +481,7 @@ public class JdbcIO { } @StartBundle - public void startBundle(Context context) { + public void startBundle() { batchCount = 0; } @@ -495,12 +496,16 @@ public class JdbcIO { batchCount++; if (batchCount >= DEFAULT_BATCH_SIZE) { - finishBundle(context); + executeBatch(); } } @FinishBundle - public void finishBundle(Context context) throws Exception { + public void finishBundle() throws Exception { + executeBatch(); + } + + private void executeBatch() throws SQLException { if (batchCount > 0) { preparedStatement.executeBatch(); connection.commit(); http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 813e051..4493e56 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -674,7 +674,7 @@ public class JmsIO { } @StartBundle - public void startBundle(Context c) throws Exception { + public void startBundle() throws Exception { if (producer == null) { if (spec.getUsername() != null) { this.connection = @@ -703,13 +703,13 @@ public class JmsIO { TextMessage message = session.createTextMessage(value); producer.send(message); } catch (Exception t) { - finishBundle(null); + finishBundle(); throw t; } } @FinishBundle - public void finishBundle(Context c) throws Exception { + public void finishBundle() throws Exception { producer.close(); producer = null; session.close(); http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 8ab33d1..f4de76a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1632,7 +1632,7 @@ public class KafkaIO { } @FinishBundle - public void finishBundle(Context c) throws IOException { + public void finishBundle() throws IOException { producer.flush(); checkForFailures(); } http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 940d875..0868ed4 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -641,7 +641,7 @@ public class MongoDbGridFSIO { } @StartBundle - public void startBundle(Context context) { + public void startBundle() { gridFsFile = gridfs.createFile(spec.filename()); if (spec.chunkSize() != null) { gridFsFile.setChunkSize(spec.chunkSize()); @@ -656,7 +656,7 @@ public class MongoDbGridFSIO { } @FinishBundle - public void finishBundle(Context context) throws Exception { + public void finishBundle() throws Exception { if (gridFsFile != null) { outputStream.flush(); outputStream.close(); http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index f8edbf1..7236a50 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -466,7 +466,7 @@ public class MongoDbIO { } @StartBundle - public void startBundle(Context ctx) throws Exception { + public void startBundle() throws Exception { batch = new ArrayList<>(); } @@ -476,12 +476,16 @@ public class MongoDbIO { // before inserting (will assign an id). batch.add(new Document(ctx.element())); if (batch.size() >= spec.batchSize()) { - finishBundle(ctx); + flush(); } } @FinishBundle - public void finishBundle(Context ctx) throws Exception { + public void finishBundle() throws Exception { + flush(); + } + + private void flush() { MongoDatabase mongoDatabase = client.getDatabase(spec.database()); MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
