This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9b85f718cb8 [yaml] Preserve windowing for windowed input when using
FileIO Java providers (#32586)
9b85f718cb8 is described below
commit 9b85f718cb856c03132eaa48845954c8922bab71
Author: Jeff Kinard <[email protected]>
AuthorDate: Mon Sep 30 17:36:11 2024 -0400
[yaml] Preserve windowing for windowed input when using FileIO Java
providers (#32586)
---
.../sdk/io/csv/providers/CsvWriteTransformProvider.java | 17 +++++++++++++----
.../io/json/providers/JsonWriteTransformProvider.java | 13 +++++++++++--
2 files changed, 24 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
index f4d54c408cf..89e8211026b 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.commons.csv.CSVFormat;
@@ -134,10 +135,18 @@ public class CsvWriteTransformProvider
if (configuration.getDelimiter() != null) {
format = format.withDelimiter(configuration.getDelimiter().charAt(0));
}
- WriteFilesResult<?> result =
- input
- .get(INPUT_ROWS_TAG)
- .apply(CsvIO.writeRows(configuration.getPath(),
format).withSuffix(""));
+
+ // Preserve input windowing
+ CsvIO.Write<Row> writeTransform =
+ CsvIO.writeRows(configuration.getPath(), format).withSuffix("");
+ if (!input
+ .get(INPUT_ROWS_TAG)
+ .getWindowingStrategy()
+ .equals(WindowingStrategy.globalDefault())) {
+ writeTransform = writeTransform.withWindowedWrites();
+ }
+
+ WriteFilesResult<?> result =
input.get(INPUT_ROWS_TAG).apply(writeTransform);
Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
return PCollectionRowTuple.of(
WRITE_RESULTS,
diff --git
a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
index 9e030821e5c..a522d176fac 100644
---
a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
+++
b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
/**
@@ -121,8 +122,16 @@ public class JsonWriteTransformProvider
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
- WriteFilesResult<?> result =
-
input.get(INPUT_ROWS_TAG).apply(JsonIO.writeRows(configuration.getPath()).withSuffix(""));
+ // Preserve input windowing
+ JsonIO.Write<Row> writeTransform =
JsonIO.writeRows(configuration.getPath()).withSuffix("");
+ if (!input
+ .get(INPUT_ROWS_TAG)
+ .getWindowingStrategy()
+ .equals(WindowingStrategy.globalDefault())) {
+ writeTransform = writeTransform.withWindowedWrites();
+ }
+
+ WriteFilesResult<?> result =
input.get(INPUT_ROWS_TAG).apply(writeTransform);
Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
return PCollectionRowTuple.of(
WRITE_RESULTS,