stevenzwu commented on code in PR #7836:
URL: https://github.com/apache/iceberg/pull/7836#discussion_r1240545609
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java:
##########
@@ -349,4 +397,42 @@ private TaskWriterFactory<RowData>
createTaskWriterFactory(List<Integer> equalit
equalityFieldIds,
false);
}
+
+ private TaskWriterFactory<RowData> createTaskWriterFactory(
+ RowType flinkType, List<Integer> equalityFieldIds) {
+ return new RowDataTaskWriterFactory(
+ SerializableTable.copyOf(table),
+ flinkType,
+ 128 * 1024 * 1024,
+ format,
+ table.properties(),
+ equalityFieldIds,
+ true);
+ }
+
+ private void initTable(boolean partitioned) {
+ if (partitioned) {
+ this.table = create(SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("data").build());
+ } else {
+ this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+ }
+
+ initTable(table);
+ }
+
+ private void initTable(TestTables.TestTable testTable) {
+ this.table = testTable;
+
+ table
+ .updateProperties()
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 *
1024))
+ .defaultFormat(format)
+ .commit();
+ }
+
+ private RowData createBinaryRowData(
Review Comment:
curious why this needs to be `BinaryRowData`? what's the problem with
`GenericrowData`?
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java:
##########
@@ -349,4 +397,42 @@ private TaskWriterFactory<RowData>
createTaskWriterFactory(List<Integer> equalit
equalityFieldIds,
false);
}
+
+ private TaskWriterFactory<RowData> createTaskWriterFactory(
+ RowType flinkType, List<Integer> equalityFieldIds) {
+ return new RowDataTaskWriterFactory(
+ SerializableTable.copyOf(table),
+ flinkType,
+ 128 * 1024 * 1024,
+ format,
+ table.properties(),
+ equalityFieldIds,
+ true);
+ }
+
+ private void initTable(boolean partitioned) {
+ if (partitioned) {
+ this.table = create(SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("data").build());
+ } else {
+ this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+ }
+
+ initTable(table);
+ }
+
+ private void initTable(TestTables.TestTable testTable) {
+ this.table = testTable;
Review Comment:
this is redundant to line 420 above. that signals the structure may not be
ideal.
maybe method in line 413 should be called `createTable`. and this method can
be kept as `initTable`. but usage at line 338 can be broken into two lines
(first - create table, second - call this method to init table).
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java:
##########
@@ -321,6 +322,53 @@ public void testPartitionedTableWithDataAndIdAsKey()
throws IOException {
"Should have expected records", expectedRowSet(createRecord(1,
"aaa")), actualRowSet("*"));
}
+ @Test
+ public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException
{
+ Schema tableSchema =
+ new Schema(
+ required(3, "id", Types.IntegerType.get()),
+ required(4, "ts", Types.TimestampType.withZone()));
+ RowType flinkType =
+ new RowType(
+ false,
+ ImmutableList.of(
+ new RowType.RowField("id", new IntType()),
+ new RowType.RowField("ts", new LocalZonedTimestampType(3))));
+
+ initTable(create(tableSchema, PartitionSpec.unpartitioned()));
+
+ List<Integer> equalityIds =
ImmutableList.of(table.schema().findField("ts").fieldId());
+ TaskWriterFactory<RowData> taskWriterFactory =
createTaskWriterFactory(flinkType, equalityIds);
+ taskWriterFactory.initialize(1, 1);
+
+ TaskWriter<RowData> writer = taskWriterFactory.create();
+ RowDataSerializer serializer = new RowDataSerializer(flinkType);
+
+ OffsetDateTime start = OffsetDateTime.now();
+ writer.write(createBinaryRowData(serializer, RowKind.INSERT, 1, start));
+ writer.write(createBinaryRowData(serializer, RowKind.INSERT, 2,
start.plusSeconds(1)));
+
Review Comment:
nit: empty line not necessary
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java:
##########
@@ -349,4 +397,42 @@ private TaskWriterFactory<RowData>
createTaskWriterFactory(List<Integer> equalit
equalityFieldIds,
false);
}
+
+ private TaskWriterFactory<RowData> createTaskWriterFactory(
+ RowType flinkType, List<Integer> equalityFieldIds) {
+ return new RowDataTaskWriterFactory(
+ SerializableTable.copyOf(table),
+ flinkType,
+ 128 * 1024 * 1024,
+ format,
+ table.properties(),
+ equalityFieldIds,
+ true);
+ }
+
+ private void initTable(boolean partitioned) {
+ if (partitioned) {
+ this.table = create(SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("data").build());
+ } else {
+ this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+ }
+
+ initTable(table);
+ }
+
+ private void initTable(TestTables.TestTable testTable) {
+ this.table = testTable;
+
+ table
+ .updateProperties()
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 *
1024))
+ .defaultFormat(format)
+ .commit();
+ }
+
+ private RowData createBinaryRowData(
Review Comment:
also we probably can remove this method for two reasons
- the method name is too broad/generic. it is meant for one specific test
method and schema
- it doesn't save much space. it would be more readable to directly
construct the RowData in the place it is used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]