This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 286e29cb7a5 [Managed Iceberg] Support partitioning time types (year,
month, day, hour) (#32939)
286e29cb7a5 is described below
commit 286e29cb7a57c7a716de080167bfaf729251fd45
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Dec 17 16:33:50 2024 -0500
[Managed Iceberg] Support partitioning time types (year, month, day, hour)
(#32939)
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
CHANGES.md | 1 +
.../beam/sdk/io/iceberg/RecordWriterManager.java | 79 +++++++-
.../beam/sdk/io/iceberg/SerializableDataFile.java | 5 +-
.../apache/beam/sdk/io/iceberg/IcebergIOIT.java | 2 +-
.../IcebergWriteSchemaTransformProviderTest.java | 98 +++++++++
.../sdk/io/iceberg/RecordWriterManagerTest.java | 224 ++++++++++++++++++++-
7 files changed, 402 insertions(+), 9 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index bbdc3a3910e..2160d3c6800 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 3
+ "modification": 5
}
diff --git a/CHANGES.md b/CHANGES.md
index deaa8bfcd47..7707e252961 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
* gcs-connector config options can be set via GcsOptions (Java)
([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for
types `date`, `time`, `timestamp`, and `timestamp(tz)`
([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2
is still supported (Java)
([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically
([#33125](https://github.com/apache/beam/pull/33125))
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
index 255fce9ece4..4c21a0175ab 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
@@ -21,6 +21,11 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.YearMonth;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -31,6 +36,7 @@ import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
@@ -38,14 +44,20 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +102,7 @@ class RecordWriterManager implements AutoCloseable {
final Cache<PartitionKey, RecordWriter> writers;
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts =
Maps.newHashMap();
+ private final Map<String, PartitionField> partitionFieldMap =
Maps.newHashMap();
private final List<Exception> exceptions = Lists.newArrayList();
DestinationState(IcebergDestination icebergDestination, Table table) {
@@ -98,6 +111,9 @@ class RecordWriterManager implements AutoCloseable {
this.spec = table.spec();
this.partitionKey = new PartitionKey(spec, schema);
this.table = table;
+ for (PartitionField partitionField : spec.fields()) {
+ partitionFieldMap.put(partitionField.name(), partitionField);
+ }
// build a cache of RecordWriters.
// writers will expire after 1 min of idle time.
@@ -123,7 +139,9 @@ class RecordWriterManager implements AutoCloseable {
throw rethrow;
}
openWriters--;
-
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk));
+ String partitionPath = getPartitionDataPath(pk.toPath(),
partitionFieldMap);
+ dataFiles.add(
+ SerializableDataFile.from(recordWriter.getDataFile(),
partitionPath));
})
.build();
}
@@ -136,7 +154,7 @@ class RecordWriterManager implements AutoCloseable {
* can't create a new writer, the {@link Record} is rejected and {@code
false} is returned.
*/
boolean write(Record record) {
- partitionKey.partition(record);
+ partitionKey.partition(getPartitionableRecord(record));
if (!writers.asMap().containsKey(partitionKey) && openWriters >=
maxNumWriters) {
return false;
@@ -185,8 +203,65 @@ class RecordWriterManager implements AutoCloseable {
e);
}
}
+
+ /**
+ * Resolves an input {@link Record}'s partition values and returns another
{@link Record} that
+ * can be applied to the destination's {@link PartitionSpec}.
+ */
+ private Record getPartitionableRecord(Record record) {
+ if (spec.isUnpartitioned()) {
+ return record;
+ }
+ Record output = GenericRecord.create(schema);
+ for (PartitionField partitionField : spec.fields()) {
+ Transform<?, ?> transform = partitionField.transform();
+ Types.NestedField field = schema.findField(partitionField.sourceId());
+ String name = field.name();
+ Object value = record.getField(name);
+ @Nullable Literal<Object> literal =
Literal.of(value.toString()).to(field.type());
+ if (literal == null || transform.isVoid() || transform.isIdentity()) {
+ output.setField(name, value);
+ } else {
+ output.setField(name, literal.value());
+ }
+ }
+ return output;
+ }
}
+ /**
+ * Returns an equivalent partition path that is made up of partition data.
Needed to reconstruct a
+ * {@link DataFile}.
+ */
+ @VisibleForTesting
+ static String getPartitionDataPath(
+ String partitionPath, Map<String, PartitionField> partitionFieldMap) {
+ if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) {
+ return partitionPath;
+ }
+ List<String> resolved = new ArrayList<>();
+ for (String partition : Splitter.on('/').splitToList(partitionPath)) {
+ List<String> nameAndValue = Splitter.on('=').splitToList(partition);
+ String name = nameAndValue.get(0);
+ String value = nameAndValue.get(1);
+ String transformName =
+
Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString();
+ if (Transforms.month().toString().equals(transformName)) {
+ int month = YearMonth.parse(value).getMonthValue();
+ value = String.valueOf(month);
+ } else if (Transforms.hour().toString().equals(transformName)) {
+ long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value,
HOUR_FORMATTER));
+ value = String.valueOf(hour);
+ }
+ resolved.add(name + "=" + value);
+ }
+ return String.join("/", resolved);
+ }
+
+ private static final DateTimeFormatter HOUR_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+ private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0,
ZoneOffset.UTC);
+
private final Catalog catalog;
private final String filePrefix;
private final long maxFileSize;
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
index 59b45616200..eef2b154d24 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
@@ -116,13 +116,14 @@ abstract class SerializableDataFile {
* Create a {@link SerializableDataFile} from a {@link DataFile} and its
associated {@link
* PartitionKey}.
*/
- static SerializableDataFile from(DataFile f, PartitionKey key) {
+ static SerializableDataFile from(DataFile f, String partitionPath) {
+
return SerializableDataFile.builder()
.setPath(f.path().toString())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
- .setPartitionPath(key.toPath())
+ .setPartitionPath(partitionPath)
.setPartitionSpecId(f.specId())
.setKeyMetadata(f.keyMetadata())
.setSplitOffsets(f.splitOffsets())
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index c79b0a55005..a060bc16d6c 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -354,7 +354,7 @@ public class IcebergIOIT implements Serializable {
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA)
.identity("bool")
- .identity("modulo_5")
+ .hour("datetime")
.truncate("str", "value_x".length())
.build();
Table table =
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index 47dc9aa425d..9834547c474 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -23,14 +23,19 @@ import static
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
@@ -49,12 +54,16 @@ import org.apache.beam.sdk.values.ValueInSingleWindow;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.ClassRule;
@@ -360,4 +369,93 @@ public class IcebergWriteSchemaTransformProviderTest {
return null;
}
}
+
+ @Test
+ public void testWritePartitionedData() {
+ Schema schema =
+ Schema.builder()
+ .addStringField("str")
+ .addInt32Field("int")
+ .addLogicalTypeField("y_date", SqlTypes.DATE)
+ .addLogicalTypeField("y_datetime", SqlTypes.DATETIME)
+ .addDateTimeField("y_datetime_tz")
+ .addLogicalTypeField("m_date", SqlTypes.DATE)
+ .addLogicalTypeField("m_datetime", SqlTypes.DATETIME)
+ .addDateTimeField("m_datetime_tz")
+ .addLogicalTypeField("d_date", SqlTypes.DATE)
+ .addLogicalTypeField("d_datetime", SqlTypes.DATETIME)
+ .addDateTimeField("d_datetime_tz")
+ .addLogicalTypeField("h_datetime", SqlTypes.DATETIME)
+ .addDateTimeField("h_datetime_tz")
+ .build();
+ org.apache.iceberg.Schema icebergSchema =
IcebergUtils.beamSchemaToIcebergSchema(schema);
+ PartitionSpec spec =
+ PartitionSpec.builderFor(icebergSchema)
+ .identity("str")
+ .bucket("int", 5)
+ .year("y_date")
+ .year("y_datetime")
+ .year("y_datetime_tz")
+ .month("m_date")
+ .month("m_datetime")
+ .month("m_datetime_tz")
+ .day("d_date")
+ .day("d_datetime")
+ .day("d_datetime_tz")
+ .hour("h_datetime")
+ .hour("h_datetime_tz")
+ .build();
+ String identifier = "default.table_" +
Long.toString(UUID.randomUUID().hashCode(), 16);
+
+ warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema,
spec);
+ Map<String, Object> config =
+ ImmutableMap.of(
+ "table",
+ identifier,
+ "catalog_properties",
+ ImmutableMap.of("type", "hadoop", "warehouse",
warehouse.location));
+
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ long millis = i * 100_00_000_000L;
+ LocalDate localDate = DateTimeUtil.dateFromDays(i * 100);
+ LocalDateTime localDateTime = DateTimeUtil.timestampFromMicros(millis *
1000);
+ DateTime dateTime = new
DateTime(millis).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25));
+ Row row =
+ Row.withSchema(schema)
+ .addValues(
+ "str_" + i,
+ i,
+ localDate,
+ localDateTime,
+ dateTime,
+ localDate,
+ localDateTime,
+ dateTime,
+ localDate,
+ localDateTime,
+ dateTime,
+ localDateTime,
+ dateTime)
+ .build();
+ rows.add(row);
+ }
+
+ PCollection<Row> result =
+ testPipeline
+ .apply("Records To Add", Create.of(rows))
+ .setRowSchema(schema)
+ .apply(Managed.write(Managed.ICEBERG).withConfig(config))
+ .get(SNAPSHOTS_TAG);
+
+ PAssert.that(result)
+ .satisfies(new VerifyOutputs(Collections.singletonList(identifier),
"append"));
+ testPipeline.run().waitUntilFinish();
+
+ Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
+ PCollection<Row> readRows =
+
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
+ PAssert.that(readRows).containsInAnyOrder(rows);
+ p.run();
+ }
}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
index 2bce390e099..5168f71fef9 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
@@ -27,9 +27,14 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -39,6 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
@@ -46,6 +52,8 @@ import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -85,9 +93,14 @@ public class RecordWriterManagerTest {
private WindowedValue<IcebergDestination> getWindowedDestination(
String tableName, @Nullable PartitionSpec partitionSpec) {
+ return getWindowedDestination(tableName, ICEBERG_SCHEMA, partitionSpec);
+ }
+
+ private WindowedValue<IcebergDestination> getWindowedDestination(
+ String tableName, org.apache.iceberg.Schema schema, @Nullable
PartitionSpec partitionSpec) {
TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName);
- warehouse.createTable(tableIdentifier, ICEBERG_SCHEMA, partitionSpec);
+ warehouse.createTable(tableIdentifier, schema, partitionSpec);
IcebergDestination icebergDestination =
IcebergDestination.builder()
@@ -314,8 +327,15 @@ public class RecordWriterManagerTest {
DataFile datafile = writer.getDataFile();
assertEquals(2L, datafile.recordCount());
+ Map<String, PartitionField> partitionFieldMap = new HashMap<>();
+ for (PartitionField partitionField : PARTITION_SPEC.fields()) {
+ partitionFieldMap.put(partitionField.name(), partitionField);
+ }
+
+ String partitionPath =
+ RecordWriterManager.getPartitionDataPath(partitionKey.toPath(),
partitionFieldMap);
DataFile roundTripDataFile =
- SerializableDataFile.from(datafile, partitionKey)
+ SerializableDataFile.from(datafile, partitionPath)
.createDataFile(ImmutableMap.of(PARTITION_SPEC.specId(),
PARTITION_SPEC));
checkDataFileEquality(datafile, roundTripDataFile);
@@ -347,8 +367,14 @@ public class RecordWriterManagerTest {
writer.close();
// fetch data file and its serializable version
+ Map<String, PartitionField> partitionFieldMap = new HashMap<>();
+ for (PartitionField partitionField : PARTITION_SPEC.fields()) {
+ partitionFieldMap.put(partitionField.name(), partitionField);
+ }
+ String partitionPath =
+ RecordWriterManager.getPartitionDataPath(partitionKey.toPath(),
partitionFieldMap);
DataFile datafile = writer.getDataFile();
- SerializableDataFile serializableDataFile =
SerializableDataFile.from(datafile, partitionKey);
+ SerializableDataFile serializableDataFile =
SerializableDataFile.from(datafile, partitionPath);
assertEquals(2L, datafile.recordCount());
assertEquals(serializableDataFile.getPartitionSpecId(), datafile.specId());
@@ -415,6 +441,198 @@ public class RecordWriterManagerTest {
}
}
+ @Test
+ public void testIdentityPartitioning() throws IOException {
+ Schema primitiveTypeSchema =
+ Schema.builder()
+ .addBooleanField("bool")
+ .addInt32Field("int")
+ .addInt64Field("long")
+ .addFloatField("float")
+ .addDoubleField("double")
+ .addStringField("str")
+ .build();
+
+ Row row =
+ Row.withSchema(primitiveTypeSchema).addValues(true, 1, 1L, 1.23f,
4.56, "str").build();
+ org.apache.iceberg.Schema icebergSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(primitiveTypeSchema);
+ PartitionSpec spec =
+ PartitionSpec.builderFor(icebergSchema)
+ .identity("bool")
+ .identity("int")
+ .identity("long")
+ .identity("float")
+ .identity("double")
+ .identity("str")
+ .build();
+ WindowedValue<IcebergDestination> dest =
+ getWindowedDestination("identity_partitioning", icebergSchema, spec);
+
+ RecordWriterManager writer =
+ new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE,
Integer.MAX_VALUE);
+ writer.write(dest, row);
+ writer.close();
+ List<SerializableDataFile> files =
writer.getSerializableDataFiles().get(dest);
+ assertEquals(1, files.size());
+ SerializableDataFile dataFile = files.get(0);
+ assertEquals(1, dataFile.getRecordCount());
+ // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str
+ List<String> expectedPartitions = new ArrayList<>();
+ for (Schema.Field field : primitiveTypeSchema.getFields()) {
+ Object val = row.getValue(field.getName());
+ expectedPartitions.add(field.getName() + "=" + val);
+ }
+ String expectedPartitionPath = String.join("/", expectedPartitions);
+ assertEquals(expectedPartitionPath, dataFile.getPartitionPath());
+ assertThat(dataFile.getPath(), containsString(expectedPartitionPath));
+ }
+
+ @Test
+ public void testBucketPartitioning() throws IOException {
+ Schema bucketSchema =
+ Schema.builder()
+ .addInt32Field("int")
+ .addInt64Field("long")
+ .addStringField("str")
+ .addLogicalTypeField("date", SqlTypes.DATE)
+ .addLogicalTypeField("time", SqlTypes.TIME)
+ .addLogicalTypeField("datetime", SqlTypes.DATETIME)
+ .addDateTimeField("datetime_tz")
+ .build();
+
+ String timestamp = "2024-10-08T13:18:20.053";
+ LocalDateTime localDateTime = LocalDateTime.parse(timestamp);
+
+ Row row =
+ Row.withSchema(bucketSchema)
+ .addValues(
+ 1,
+ 1L,
+ "str",
+ localDateTime.toLocalDate(),
+ localDateTime.toLocalTime(),
+ localDateTime,
+ DateTime.parse(timestamp))
+ .build();
+ org.apache.iceberg.Schema icebergSchema =
IcebergUtils.beamSchemaToIcebergSchema(bucketSchema);
+ PartitionSpec spec =
+ PartitionSpec.builderFor(icebergSchema)
+ .bucket("int", 2)
+ .bucket("long", 2)
+ .bucket("str", 2)
+ .bucket("date", 2)
+ .bucket("time", 2)
+ .bucket("datetime", 2)
+ .bucket("datetime_tz", 2)
+ .build();
+ WindowedValue<IcebergDestination> dest =
+ getWindowedDestination("bucket_partitioning", icebergSchema, spec);
+
+ RecordWriterManager writer =
+ new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE,
Integer.MAX_VALUE);
+ writer.write(dest, row);
+ writer.close();
+ List<SerializableDataFile> files =
writer.getSerializableDataFiles().get(dest);
+ assertEquals(1, files.size());
+ SerializableDataFile dataFile = files.get(0);
+ assertEquals(1, dataFile.getRecordCount());
+ for (Schema.Field field : bucketSchema.getFields()) {
+ String expectedPartition = field.getName() + "_bucket";
+ assertThat(dataFile.getPartitionPath(),
containsString(expectedPartition));
+ assertThat(dataFile.getPath(), containsString(expectedPartition));
+ }
+ }
+
+ @Test
+ public void testTimePartitioning() throws IOException {
+ Schema timePartitioningSchema =
+ Schema.builder()
+ .addLogicalTypeField("y_date", SqlTypes.DATE)
+ .addLogicalTypeField("y_datetime", SqlTypes.DATETIME)
+ .addDateTimeField("y_datetime_tz")
+ .addLogicalTypeField("m_date", SqlTypes.DATE)
+ .addLogicalTypeField("m_datetime", SqlTypes.DATETIME)
+ .addDateTimeField("m_datetime_tz")
+ .addLogicalTypeField("d_date", SqlTypes.DATE)
+ .addLogicalTypeField("d_datetime", SqlTypes.DATETIME)
+ .addDateTimeField("d_datetime_tz")
+ .addLogicalTypeField("h_datetime", SqlTypes.DATETIME)
+ .addDateTimeField("h_datetime_tz")
+ .build();
+ org.apache.iceberg.Schema icebergSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(timePartitioningSchema);
+ PartitionSpec spec =
+ PartitionSpec.builderFor(icebergSchema)
+ .year("y_date")
+ .year("y_datetime")
+ .year("y_datetime_tz")
+ .month("m_date")
+ .month("m_datetime")
+ .month("m_datetime_tz")
+ .day("d_date")
+ .day("d_datetime")
+ .day("d_datetime_tz")
+ .hour("h_datetime")
+ .hour("h_datetime_tz")
+ .build();
+
+ WindowedValue<IcebergDestination> dest =
+ getWindowedDestination("time_partitioning", icebergSchema, spec);
+
+ String timestamp = "2024-10-08T13:18:20.053";
+ LocalDateTime localDateTime = LocalDateTime.parse(timestamp);
+ LocalDate localDate = localDateTime.toLocalDate();
+ String timestamptz = "2024-10-08T13:18:20.053+03:27";
+ DateTime dateTime = DateTime.parse(timestamptz);
+
+ Row row =
+ Row.withSchema(timePartitioningSchema)
+ .addValues(localDate, localDateTime, dateTime) // year
+ .addValues(localDate, localDateTime, dateTime) // month
+ .addValues(localDate, localDateTime, dateTime) // day
+ .addValues(localDateTime, dateTime) // hour
+ .build();
+
+ // write some rows
+ RecordWriterManager writer =
+ new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE,
Integer.MAX_VALUE);
+ writer.write(dest, row);
+ writer.close();
+ List<SerializableDataFile> files =
writer.getSerializableDataFiles().get(dest);
+ assertEquals(1, files.size());
+ SerializableDataFile serializableDataFile = files.get(0);
+ assertEquals(1, serializableDataFile.getRecordCount());
+
+ int year = localDateTime.getYear();
+ int month = localDateTime.getMonthValue();
+ int day = localDateTime.getDayOfMonth();
+ int hour = localDateTime.getHour();
+ List<String> expectedPartitions = new ArrayList<>();
+ for (Schema.Field field : timePartitioningSchema.getFields()) {
+ String name = field.getName();
+ String expected = "";
+ if (name.startsWith("y_")) {
+ expected = String.format("%s_year=%s", name, year);
+ } else if (name.startsWith("m_")) {
+ expected = String.format("%s_month=%s-%02d", name, year, month);
+ } else if (name.startsWith("d_")) {
+ expected = String.format("%s_day=%s-%02d-%02d", name, year, month,
day);
+ } else if (name.startsWith("h_")) {
+ if (name.contains("tz")) {
+ hour = dateTime.withZone(DateTimeZone.UTC).getHourOfDay();
+ }
+ expected = String.format("%s_hour=%s-%02d-%02d-%02d", name, year,
month, day, hour);
+ }
+ expectedPartitions.add(expected);
+ }
+ String expectedPartition = String.join("/", expectedPartitions);
+ DataFile dataFile =
+ serializableDataFile.createDataFile(
+ catalog.loadTable(dest.getValue().getTableIdentifier()).specs());
+ assertThat(dataFile.path().toString(), containsString(expectedPartition));
+ }
+
@Rule public ExpectedException thrown = ExpectedException.none();
@Test