This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new fbed421697 Flink: Key projection should be based on the requested 
Flink table schema (#7836)
fbed421697 is described below

commit fbed421697c17cf78c416a0210dc1d539a13f681
Author: Xianyang Liu <[email protected]>
AuthorDate: Tue Jun 27 01:32:57 2023 +0800

    Flink: Key projection should be based on the requested Flink table schema 
(#7836)
    
    Co-authored-by: xianyangliu <[email protected]>
---
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |   3 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  10 +-
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    | 128 +++++++++++++++++----
 3 files changed, 117 insertions(+), 24 deletions(-)

diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index b1ffda1560..40e0b5f2a3 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -63,7 +63,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
     this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
     this.keyWrapper =
         new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), 
deleteSchema.asStruct());
-    this.keyProjection = RowDataProjection.create(schema, deleteSchema);
+    this.keyProjection =
+        RowDataProjection.create(flinkSchema, schema.asStruct(), 
deleteSchema.asStruct());
     this.upsert = upsert;
   }
 
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 408c281004..5efb7413e7 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -22,7 +22,6 @@ import static 
org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -49,6 +48,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.EqualityDeleteWriter;
 import org.apache.iceberg.deletes.PositionDelete;
@@ -349,7 +349,10 @@ public class SimpleDataUtil {
 
   public static StructLikeSet expectedRowSet(Table table, Record... records) {
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
-    Collections.addAll(set, records);
+    InternalRecordWrapper wrapper = new 
InternalRecordWrapper(table.schema().asStruct());
+    for (Record record : records) {
+      set.add(wrapper.copyFor(record));
+    }
     return set;
   }
 
@@ -361,12 +364,13 @@ public class SimpleDataUtil {
       throws IOException {
     table.refresh();
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    InternalRecordWrapper wrapper = new 
InternalRecordWrapper(table.schema().asStruct());
     try (CloseableIterable<Record> reader =
         IcebergGenerics.read(table)
             .useSnapshot(snapshotId == null ? 
table.currentSnapshot().snapshotId() : snapshotId)
             .select(columns)
             .build()) {
-      reader.forEach(set::add);
+      reader.forEach(record -> set.add(wrapper.copyFor(record)));
     }
     return set;
   }
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 1f8cbfe191..4ecbd1c129 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -23,31 +23,47 @@ import static 
org.apache.iceberg.flink.SimpleDataUtil.createInsert;
 import static org.apache.iceberg.flink.SimpleDataUtil.createRecord;
 import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateAfter;
 import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateBefore;
+import static org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.OffsetDateTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeSet;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -79,20 +95,6 @@ public class TestDeltaTaskWriter extends TableTestBase {
     this.metadataDir = new File(tableDir, "metadata");
   }
 
-  private void initTable(boolean partitioned) {
-    if (partitioned) {
-      this.table = create(SCHEMA, 
PartitionSpec.builderFor(SCHEMA).identity("data").build());
-    } else {
-      this.table = create(SCHEMA, PartitionSpec.unpartitioned());
-    }
-
-    table
-        .updateProperties()
-        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 
1024))
-        .defaultFormat(format)
-        .commit();
-  }
-
   private int idFieldId() {
     return table.schema().findField("id").fieldId();
   }
@@ -170,18 +172,18 @@ public class TestDeltaTaskWriter extends TableTestBase {
 
   @Test
   public void testUnpartitioned() throws IOException {
-    initTable(false);
+    createAndInitTable(false);
     testCdcEvents(false);
   }
 
   @Test
   public void testPartitioned() throws IOException {
-    initTable(true);
+    createAndInitTable(true);
     testCdcEvents(true);
   }
 
   private void testWritePureEqDeletes(boolean partitioned) throws IOException {
-    initTable(partitioned);
+    createAndInitTable(partitioned);
     List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
@@ -210,7 +212,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
   }
 
   private void testAbort(boolean partitioned) throws IOException {
-    initTable(partitioned);
+    createAndInitTable(partitioned);
     List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
@@ -253,7 +255,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
 
   @Test
   public void testPartitionedTableWithDataAsKey() throws IOException {
-    initTable(true);
+    createAndInitTable(true);
     List<Integer> equalityFieldIds = Lists.newArrayList(dataFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
@@ -298,7 +300,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
 
   @Test
   public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
-    initTable(true);
+    createAndInitTable(true);
     List<Integer> equalityFieldIds = Lists.newArrayList(dataFieldId(), 
idFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
@@ -321,6 +323,62 @@ public class TestDeltaTaskWriter extends TableTestBase {
         "Should have expected records", expectedRowSet(createRecord(1, 
"aaa")), actualRowSet("*"));
   }
 
+  @Test
+  public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException 
{
+    Schema tableSchema =
+        new Schema(
+            required(3, "id", Types.IntegerType.get()),
+            required(4, "ts", Types.TimestampType.withZone()));
+    RowType flinkType =
+        new RowType(
+            false,
+            ImmutableList.of(
+                new RowType.RowField("id", new IntType()),
+                new RowType.RowField("ts", new LocalZonedTimestampType(3))));
+
+    this.table = create(tableSchema, PartitionSpec.unpartitioned());
+    initTable(table);
+
+    List<Integer> equalityIds = 
ImmutableList.of(table.schema().findField("ts").fieldId());
+    TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(flinkType, equalityIds);
+    taskWriterFactory.initialize(1, 1);
+
+    TaskWriter<RowData> writer = taskWriterFactory.create();
+    RowDataSerializer serializer = new RowDataSerializer(flinkType);
+    OffsetDateTime start = OffsetDateTime.now();
+    writer.write(
+        serializer.toBinaryRow(
+            GenericRowData.ofKind(
+                RowKind.INSERT, 1, 
TimestampData.fromInstant(start.toInstant()))));
+    writer.write(
+        serializer.toBinaryRow(
+            GenericRowData.ofKind(
+                RowKind.INSERT, 2, 
TimestampData.fromInstant(start.plusSeconds(1).toInstant()))));
+    writer.write(
+        serializer.toBinaryRow(
+            GenericRowData.ofKind(
+                RowKind.DELETE, 2, 
TimestampData.fromInstant(start.plusSeconds(1).toInstant()))));
+
+    WriteResult result = writer.complete();
+    // One data file
+    Assertions.assertThat(result.dataFiles().length).isEqualTo(1);
+    // One eq delete file + one pos delete file
+    Assertions.assertThat(result.deleteFiles().length).isEqualTo(2);
+    Assertions.assertThat(
+            Arrays.stream(result.deleteFiles())
+                .map(ContentFile::content)
+                .collect(Collectors.toSet()))
+        .isEqualTo(Sets.newHashSet(FileContent.POSITION_DELETES, 
FileContent.EQUALITY_DELETES));
+    commitTransaction(result);
+
+    Record expectedRecord = GenericRecord.create(tableSchema);
+    expectedRecord.setField("id", 1);
+    int cutPrecisionNano = start.getNano() / 1000000 * 1000000;
+    expectedRecord.setField("ts", start.withNano(cutPrecisionNano));
+
+    
Assertions.assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(expectedRecord));
+  }
+
   private void commitTransaction(WriteResult result) {
     RowDelta rowDelta = table.newRowDelta();
     Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
@@ -349,4 +407,34 @@ public class TestDeltaTaskWriter extends TableTestBase {
         equalityFieldIds,
         false);
   }
+
+  private TaskWriterFactory<RowData> createTaskWriterFactory(
+      RowType flinkType, List<Integer> equalityFieldIds) {
+    return new RowDataTaskWriterFactory(
+        SerializableTable.copyOf(table),
+        flinkType,
+        128 * 1024 * 1024,
+        format,
+        table.properties(),
+        equalityFieldIds,
+        true);
+  }
+
+  private void createAndInitTable(boolean partitioned) {
+    if (partitioned) {
+      this.table = create(SCHEMA, 
PartitionSpec.builderFor(SCHEMA).identity("data").build());
+    } else {
+      this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    }
+
+    initTable(table);
+  }
+
+  private void initTable(TestTables.TestTable testTable) {
+    testTable
+        .updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 
1024))
+        .defaultFormat(format)
+        .commit();
+  }
 }

Reply via email to