This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new fbed421697 Flink: Key projection should be based on the requested
Flink table schema (#7836)
fbed421697 is described below
commit fbed421697c17cf78c416a0210dc1d539a13f681
Author: Xianyang Liu <[email protected]>
AuthorDate: Tue Jun 27 01:32:57 2023 +0800
Flink: Key projection should be based on the requested Flink table schema
(#7836)
Co-authored-by: xianyangliu <[email protected]>
---
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 3 +-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 10 +-
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 128 +++++++++++++++++----
3 files changed, 117 insertions(+), 24 deletions(-)
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index b1ffda1560..40e0b5f2a3 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -63,7 +63,8 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.keyWrapper =
new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema),
deleteSchema.asStruct());
- this.keyProjection = RowDataProjection.create(schema, deleteSchema);
+ this.keyProjection =
+ RowDataProjection.create(flinkSchema, schema.asStruct(),
deleteSchema.asStruct());
this.upsert = upsert;
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 408c281004..5efb7413e7 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -22,7 +22,6 @@ import static
org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
import java.io.IOException;
import java.time.Duration;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -49,6 +48,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
@@ -349,7 +349,10 @@ public class SimpleDataUtil {
public static StructLikeSet expectedRowSet(Table table, Record... records) {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
- Collections.addAll(set, records);
+ InternalRecordWrapper wrapper = new
InternalRecordWrapper(table.schema().asStruct());
+ for (Record record : records) {
+ set.add(wrapper.copyFor(record));
+ }
return set;
}
@@ -361,12 +364,13 @@ public class SimpleDataUtil {
throws IOException {
table.refresh();
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ InternalRecordWrapper wrapper = new
InternalRecordWrapper(table.schema().asStruct());
try (CloseableIterable<Record> reader =
IcebergGenerics.read(table)
.useSnapshot(snapshotId == null ?
table.currentSnapshot().snapshotId() : snapshotId)
.select(columns)
.build()) {
- reader.forEach(set::add);
+ reader.forEach(record -> set.add(wrapper.copyFor(record)));
}
return set;
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 1f8cbfe191..4ecbd1c129 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -23,31 +23,47 @@ import static
org.apache.iceberg.flink.SimpleDataUtil.createInsert;
import static org.apache.iceberg.flink.SimpleDataUtil.createRecord;
import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateAfter;
import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateBefore;
+import static org.apache.iceberg.types.Types.NestedField.required;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -79,20 +95,6 @@ public class TestDeltaTaskWriter extends TableTestBase {
this.metadataDir = new File(tableDir, "metadata");
}
- private void initTable(boolean partitioned) {
- if (partitioned) {
- this.table = create(SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("data").build());
- } else {
- this.table = create(SCHEMA, PartitionSpec.unpartitioned());
- }
-
- table
- .updateProperties()
- .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 *
1024))
- .defaultFormat(format)
- .commit();
- }
-
private int idFieldId() {
return table.schema().findField("id").fieldId();
}
@@ -170,18 +172,18 @@ public class TestDeltaTaskWriter extends TableTestBase {
@Test
public void testUnpartitioned() throws IOException {
- initTable(false);
+ createAndInitTable(false);
testCdcEvents(false);
}
@Test
public void testPartitioned() throws IOException {
- initTable(true);
+ createAndInitTable(true);
testCdcEvents(true);
}
private void testWritePureEqDeletes(boolean partitioned) throws IOException {
- initTable(partitioned);
+ createAndInitTable(partitioned);
List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId());
TaskWriterFactory<RowData> taskWriterFactory =
createTaskWriterFactory(equalityFieldIds);
taskWriterFactory.initialize(1, 1);
@@ -210,7 +212,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
}
private void testAbort(boolean partitioned) throws IOException {
- initTable(partitioned);
+ createAndInitTable(partitioned);
List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId());
TaskWriterFactory<RowData> taskWriterFactory =
createTaskWriterFactory(equalityFieldIds);
taskWriterFactory.initialize(1, 1);
@@ -253,7 +255,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
@Test
public void testPartitionedTableWithDataAsKey() throws IOException {
- initTable(true);
+ createAndInitTable(true);
List<Integer> equalityFieldIds = Lists.newArrayList(dataFieldId());
TaskWriterFactory<RowData> taskWriterFactory =
createTaskWriterFactory(equalityFieldIds);
taskWriterFactory.initialize(1, 1);
@@ -298,7 +300,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
@Test
public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
- initTable(true);
+ createAndInitTable(true);
List<Integer> equalityFieldIds = Lists.newArrayList(dataFieldId(),
idFieldId());
TaskWriterFactory<RowData> taskWriterFactory =
createTaskWriterFactory(equalityFieldIds);
taskWriterFactory.initialize(1, 1);
@@ -321,6 +323,62 @@ public class TestDeltaTaskWriter extends TableTestBase {
"Should have expected records", expectedRowSet(createRecord(1,
"aaa")), actualRowSet("*"));
}
+ @Test
+ public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException
{
+ Schema tableSchema =
+ new Schema(
+ required(3, "id", Types.IntegerType.get()),
+ required(4, "ts", Types.TimestampType.withZone()));
+ RowType flinkType =
+ new RowType(
+ false,
+ ImmutableList.of(
+ new RowType.RowField("id", new IntType()),
+ new RowType.RowField("ts", new LocalZonedTimestampType(3))));
+
+ this.table = create(tableSchema, PartitionSpec.unpartitioned());
+ initTable(table);
+
+ List<Integer> equalityIds =
ImmutableList.of(table.schema().findField("ts").fieldId());
+ TaskWriterFactory<RowData> taskWriterFactory =
createTaskWriterFactory(flinkType, equalityIds);
+ taskWriterFactory.initialize(1, 1);
+
+ TaskWriter<RowData> writer = taskWriterFactory.create();
+ RowDataSerializer serializer = new RowDataSerializer(flinkType);
+ OffsetDateTime start = OffsetDateTime.now();
+ writer.write(
+ serializer.toBinaryRow(
+ GenericRowData.ofKind(
+ RowKind.INSERT, 1,
TimestampData.fromInstant(start.toInstant()))));
+ writer.write(
+ serializer.toBinaryRow(
+ GenericRowData.ofKind(
+ RowKind.INSERT, 2,
TimestampData.fromInstant(start.plusSeconds(1).toInstant()))));
+ writer.write(
+ serializer.toBinaryRow(
+ GenericRowData.ofKind(
+ RowKind.DELETE, 2,
TimestampData.fromInstant(start.plusSeconds(1).toInstant()))));
+
+ WriteResult result = writer.complete();
+ // One data file
+ Assertions.assertThat(result.dataFiles().length).isEqualTo(1);
+ // One eq delete file + one pos delete file
+ Assertions.assertThat(result.deleteFiles().length).isEqualTo(2);
+ Assertions.assertThat(
+ Arrays.stream(result.deleteFiles())
+ .map(ContentFile::content)
+ .collect(Collectors.toSet()))
+ .isEqualTo(Sets.newHashSet(FileContent.POSITION_DELETES,
FileContent.EQUALITY_DELETES));
+ commitTransaction(result);
+
+ Record expectedRecord = GenericRecord.create(tableSchema);
+ expectedRecord.setField("id", 1);
+ int cutPrecisionNano = start.getNano() / 1000000 * 1000000;
+ expectedRecord.setField("ts", start.withNano(cutPrecisionNano));
+
+
Assertions.assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(expectedRecord));
+ }
+
private void commitTransaction(WriteResult result) {
RowDelta rowDelta = table.newRowDelta();
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
@@ -349,4 +407,34 @@ public class TestDeltaTaskWriter extends TableTestBase {
equalityFieldIds,
false);
}
+
+ private TaskWriterFactory<RowData> createTaskWriterFactory(
+ RowType flinkType, List<Integer> equalityFieldIds) {
+ return new RowDataTaskWriterFactory(
+ SerializableTable.copyOf(table),
+ flinkType,
+ 128 * 1024 * 1024,
+ format,
+ table.properties(),
+ equalityFieldIds,
+ true);
+ }
+
+ private void createAndInitTable(boolean partitioned) {
+ if (partitioned) {
+ this.table = create(SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("data").build());
+ } else {
+ this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+ }
+
+ initTable(table);
+ }
+
+ private void initTable(TestTables.TestTable testTable) {
+ testTable
+ .updateProperties()
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 *
1024))
+ .defaultFormat(format)
+ .commit();
+ }
}