XiaoHongbo-Hope commented on code in PR #6766:
URL: https://github.com/apache/paimon/pull/6766#discussion_r2597196385
##########
paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java:
##########
@@ -198,6 +220,139 @@ public void testJavaWriteReadPkTable() throws Exception {
"6, Beef, Meat, 8.0");
}
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testPKDeletionVectorWrite() throws Exception {
+ Consumer<Options> optionsSetter =
+ options -> {
+ // let level has many files
+ options.set(TARGET_FILE_SIZE, new MemorySize(1));
+ options.set(DELETION_VECTORS_ENABLED, true);
+ };
+ String tableName = "test_pk_dv";
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
+ FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+ StreamTableWrite write = table.newWrite(commitUser);
+ IOManager ioManager = IOManager.create(tablePath.toString());
+ write.withIOManager(ioManager);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(createRow3Cols(1, 10, 100L));
+ write.write(createRow3Cols(2, 20, 200L));
+ write.write(createRow3Cols(1, 11, 101L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(createRow3Cols(1, 10, 1000L));
+ write.write(createRow3Cols(2, 21, 201L));
+ write.write(createRow3Cols(2, 21, 2001L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(createRow3Cols(1, 11, 1001L));
+ write.write(createRow3Cols(2, 21, 20001L));
+ write.write(createRow3Cols(2, 22, 202L));
+ write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 11, 1001L));
+ write.write(createRow3ColsWithKind(RowKind.DELETE, 2, 20, 200L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // test result
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
+ List<String> result =
+ getResult(table.newRead(), table.newScan().plan().splits(),
rowDataToString);
+ assertThat(result)
+ .containsExactlyInAnyOrder("+I[1, 10, 1000]", "+I[2, 21,
20001]", "+I[2, 22, 202]");
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testPKDeletionVectorWriteMultiBatch() throws Exception {
+ Consumer<Options> optionsSetter =
+ options -> {
+ // let level has many files
+ options.set(TARGET_FILE_SIZE, new MemorySize(128 * 1024));
+ options.set(DELETION_VECTORS_ENABLED, true);
+ };
+ String tableName = "test_pk_dv_multi_batch";
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
+ FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+ StreamTableWrite write = table.newWrite(commitUser);
+ IOManager ioManager = IOManager.create(tablePath.toString());
+ write.withIOManager(ioManager);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Write 10000 records
+ for (int i = 1; i <= 10000; i++) {
+ write.write(createRow3Cols(1, i * 10, (long) i * 100));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Delete the 81930th record
+ write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 81930, 819300L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
+ List<String> result =
+ getResult(table.newRead(), table.newScan().plan().splits(),
rowDataToString);
+
+ // Verify the count is 9999
+ assertThat(result).hasSize(9999);
+
+ assertThat(result).doesNotContain("+I[1, 81930, 819300L]");
+
+ assertThat(result).contains("+I[1, 10, 100]");
+ assertThat(result).contains("+I[1, 100000, 1000000]");
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws
Exception {
+ Consumer<Options> optionsSetter =
+ options -> {
+ options.set(DELETION_VECTORS_ENABLED, true);
+ };
+ String tableName = "test_pk_dv_raw_convertable";
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
+ FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+ StreamTableWrite write = table.newWrite(commitUser);
+ IOManager ioManager = IOManager.create(tablePath.toString());
+ write.withIOManager(ioManager);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ for (int i = 1; i <= 10000; i++) {
+ write.write(createRow3Cols(1, i * 10, (long) i * 100));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 81930, 819300L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
+ List<String> result =
+ getResult(table.newRead(), table.newScan().plan().splits(),
rowDataToString);
+
+ assertThat(result).hasSize(9999);
+
+ assertThat(result).doesNotContain("+I[1, 81930, 819300L]");
+
Review Comment:
remove "L" here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]