This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b85f718cb8 [yaml] Preserve windowing for windowed input when using 
FileIO Java providers (#32586)
9b85f718cb8 is described below

commit 9b85f718cb856c03132eaa48845954c8922bab71
Author: Jeff Kinard <[email protected]>
AuthorDate: Mon Sep 30 17:36:11 2024 -0400

    [yaml] Preserve windowing for windowed input when using FileIO Java 
providers (#32586)
---
 .../sdk/io/csv/providers/CsvWriteTransformProvider.java | 17 +++++++++++++----
 .../io/json/providers/JsonWriteTransformProvider.java   | 13 +++++++++++--
 2 files changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
index f4d54c408cf..89e8211026b 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.apache.commons.csv.CSVFormat;
 
@@ -134,10 +135,18 @@ public class CsvWriteTransformProvider
       if (configuration.getDelimiter() != null) {
         format = format.withDelimiter(configuration.getDelimiter().charAt(0));
       }
-      WriteFilesResult<?> result =
-          input
-              .get(INPUT_ROWS_TAG)
-              .apply(CsvIO.writeRows(configuration.getPath(), 
format).withSuffix(""));
+
+      // Preserve input windowing
+      CsvIO.Write<Row> writeTransform =
+          CsvIO.writeRows(configuration.getPath(), format).withSuffix("");
+      if (!input
+          .get(INPUT_ROWS_TAG)
+          .getWindowingStrategy()
+          .equals(WindowingStrategy.globalDefault())) {
+        writeTransform = writeTransform.withWindowedWrites();
+      }
+
+      WriteFilesResult<?> result = 
input.get(INPUT_ROWS_TAG).apply(writeTransform);
       Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
       return PCollectionRowTuple.of(
           WRITE_RESULTS,
diff --git 
a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
 
b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
index 9e030821e5c..a522d176fac 100644
--- 
a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
+++ 
b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 
 /**
@@ -121,8 +122,16 @@ public class JsonWriteTransformProvider
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      WriteFilesResult<?> result =
-          
input.get(INPUT_ROWS_TAG).apply(JsonIO.writeRows(configuration.getPath()).withSuffix(""));
+      // Preserve input windowing
+      JsonIO.Write<Row> writeTransform = 
JsonIO.writeRows(configuration.getPath()).withSuffix("");
+      if (!input
+          .get(INPUT_ROWS_TAG)
+          .getWindowingStrategy()
+          .equals(WindowingStrategy.globalDefault())) {
+        writeTransform = writeTransform.withWindowedWrites();
+      }
+
+      WriteFilesResult<?> result = 
input.get(INPUT_ROWS_TAG).apply(writeTransform);
       Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
       return PCollectionRowTuple.of(
           WRITE_RESULTS,

Reply via email to