ahmedabu98 commented on code in PR #28272:
URL: https://github.com/apache/beam/pull/28272#discussion_r1316331843


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteIT.java:
##########
@@ -81,15 +103,35 @@ public void processElement(ProcessContext c) {
     }
   }
 
-  private GenerateSequence stream(int rowCount) {
-    int timestampIntervalInMilliseconds = 10;
-    return GenerateSequence.from(0)
-        .to(rowCount)
-        .withRate(1, Duration.millis(timestampIntervalInMilliseconds));
+  static class UnboundedStream extends PTransform<PBegin, PCollection<Long>> {

Review Comment:
   Nice, this fixes the test cases that were supposed to be streaming but 
weren't.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java:
##########
@@ -206,7 +206,15 @@ private PendingJobData startWriteRename(
     // Make sure each destination table gets a unique job id.
     String jobIdPrefix =
         BigQueryResourceNaming.createJobIdWithDestination(
-            c.sideInput(jobIdToken), finalTableDestination, -1, 
c.pane().getIndex());
+            c.sideInput(jobIdToken), finalTableDestination, -1);
+
+    if (isFirstPane) {

Review Comment:
   Unrelated to this PR but we're actually seeing a bug due to relying on the 
pane index in this code path: #28309



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java:
##########
@@ -34,28 +37,57 @@
  */
 public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {
 
-  Instant startTimestamp = Instant.now();
-  Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
-  Duration fireInterval = Duration.standardMinutes(1);
+  Instant startTimestamp;
+  Instant stopTimestamp;
+  @Nullable Duration stopDuration;
+  Duration fireInterval;
   boolean applyWindowing = false;
   boolean catchUpToNow = true;
 
-  private PeriodicImpulse() {}
+  private PeriodicImpulse() {
+    this.startTimestamp = Instant.now();
+    this.stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    this.fireInterval = Duration.standardMinutes(1);
+  }
 
   public static PeriodicImpulse create() {
     return new PeriodicImpulse();
   }
 
+  /**
+   * Assign a timestamp when the pipeliene starts to produce data.
+   *
+   * <p>Cannot be used along with {@link #stopAfter}.
+   */
   public PeriodicImpulse startAt(Instant startTime) {
+    checkArgument(stopDuration == null, "startAt and stopAfter cannot be set 
at the same time");
     this.startTimestamp = startTime;
     return this;
   }
 
+  /**
+   * Assign a timestamp when the pipeliene stops producing data.
+   *
+   * <p>Cannot be used along with {@link #stopAfter}.
+   */
   public PeriodicImpulse stopAt(Instant stopTime) {
+    checkArgument(stopDuration == null, "stopAt and stopAfter cannot be set at 
the same time");
     this.stopTimestamp = stopTime;
     return this;
   }
 
+  /**
+   * <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b>
+   *
+   * <p>Assign a time interval at which the pipeliene produces data. This is 
different from setting
+   * {@link #startAt} and {@link #stopAt}, as the first timestamp is 
determined at run time
+   * (pipeline starts processing).
+   */
+  public PeriodicImpulse stopAfter(Duration duration) {

Review Comment:
   mark with `@Internal`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java:
##########
@@ -47,21 +48,22 @@ class BigQueryResourceNaming {
    * @param prefix A prefix generated in {@link 
BigQueryResourceNaming::createJobIdPrefix}.
    * @param tableDestination A descriptor of the destination table.
    * @param partition A partition number in the destination table.
-   * @param index
-   * @return
+   * @return a generated jobId.
    */
   static String createJobIdWithDestination(
-      String prefix, TableDestination tableDestination, int partition, long 
index) {
+      String prefix, TableDestination tableDestination, int partition) {
     // Job ID must be different for each partition of each table.
     String destinationHash =
-        
Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString();
-    String jobId = String.format("%s_%s", prefix, destinationHash);
+        Hashing.murmur3_128()
+            .hashUnencodedChars(tableDestination.toString())
+            .toString()
+            .substring(0, 16);
+    // add randomness to jobId to avoid conflict
+    String jobId =
+        String.format("%s_%s_%s", prefix, destinationHash, 
randomUUIDString().substring(0, 16));

Review Comment:
   I'm worried this may cause duplication of data due to bundle retry. 
   
   Let's say a bundle fails during load/copy execution and the BQ job was 
successful. Beam would process the bundle again and these lines will create a 
fresh job ID. Under previous circumstances, this job ID would be recognized by 
BQ as a recently successful job and will ignore it. But now since the ID is 
always new, it will execute the job and we will end up with duplicate data.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java:
##########
@@ -34,28 +37,57 @@
  */
 public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {
 
-  Instant startTimestamp = Instant.now();
-  Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
-  Duration fireInterval = Duration.standardMinutes(1);
+  Instant startTimestamp;
+  Instant stopTimestamp;
+  @Nullable Duration stopDuration;
+  Duration fireInterval;
   boolean applyWindowing = false;
   boolean catchUpToNow = true;
 
-  private PeriodicImpulse() {}
+  private PeriodicImpulse() {
+    this.startTimestamp = Instant.now();
+    this.stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    this.fireInterval = Duration.standardMinutes(1);
+  }
 
   public static PeriodicImpulse create() {
     return new PeriodicImpulse();
   }
 
+  /**
+   * Assign a timestamp when the pipeliene starts to produce data.
+   *
+   * <p>Cannot be used along with {@link #stopAfter}.
+   */
   public PeriodicImpulse startAt(Instant startTime) {
+    checkArgument(stopDuration == null, "startAt and stopAfter cannot be set 
at the same time");
     this.startTimestamp = startTime;
     return this;
   }
 
+  /**
+   * Assign a timestamp when the pipeliene stops producing data.
+   *
+   * <p>Cannot be used along with {@link #stopAfter}.
+   */
   public PeriodicImpulse stopAt(Instant stopTime) {
+    checkArgument(stopDuration == null, "stopAt and stopAfter cannot be set at 
the same time");
     this.stopTimestamp = stopTime;
     return this;
   }
 
+  /**
+   * <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b>
+   *
+   * <p>Assign a time interval at which the pipeliene produces data. This is 
different from setting
+   * {@link #startAt} and {@link #stopAt}, as the first timestamp is 
determined at run time
+   * (pipeline starts processing).
+   */
+  public PeriodicImpulse stopAfter(Duration duration) {

Review Comment:
   P.S. also add to `catchUpToNow()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to