zhangyue19921010 commented on code in PR #13365:
URL: https://github.com/apache/hudi/pull/13365#discussion_r2149182005
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkBinaryCopyClusteringAndValidationMeta.java:
##########
@@ -192,4 +219,115 @@ private void checkFileFooter(List<String> clusteredFiles,
List<HoodieRecord> all
assertTrue(rows.select("_hoodie_file_name").collectAsList().stream()
.allMatch(t -> t.getAs(0).equals(clusteredFileName)));
}
+
+ @Test
+ public void testSupportBinaryStreamCopy() throws IOException {
+ HoodieWriteConfig writeConfig = new HoodieWriteConfig.Builder()
+ .withPath(basePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext,
metaClient);
+ SparkBinaryCopyClusteringExecutionStrategy strategy = new
SparkBinaryCopyClusteringExecutionStrategy(table, engineContext, writeConfig);
+
+ MessageType legacySchema = new AvroSchemaConverter().convert(AVRO_SCHEMA);
+ Configuration conf = new Configuration();
+ conf.set("parquet.avro.write-old-list-structure", "false");
+ MessageType standardSchema = new
AvroSchemaConverter(conf).convert(AVRO_SCHEMA);
+
+ BloomFilter simpleBloomFilter = BloomFilterFactory.createBloomFilter(1000,
0.0001, 10000, SIMPLE.name());
+ BloomFilter dynmicBloomFilter = BloomFilterFactory.createBloomFilter(1000,
0.0001, 10000, DYNAMIC_V0.name());
+ String file1 = makeTestFile("file-1.parquet", AVRO_SCHEMA, legacySchema,
simpleBloomFilter);
+ String file2 = makeTestFile("file-2.parquet", AVRO_SCHEMA, legacySchema,
dynmicBloomFilter);
+ String file3 = makeTestFile("file-3.parquet", AVRO_SCHEMA, standardSchema,
dynmicBloomFilter);
+ String file4 = makeTestFile("file-4.parquet", AVRO_SCHEMA, standardSchema,
dynmicBloomFilter);
+
+ // input file contains multiple bloom filter code type, should return false
+ List<ClusteringGroupInfo> groups = makeClusteringGroup(file1, file2);
+ Assertions.assertFalse(strategy.supportBinaryStreamCopy(groups, new
HashMap<>()));
+
+ // input file contains legacy schema, should return false
+ groups = makeClusteringGroup(file2, file3);
+ Assertions.assertFalse(strategy.supportBinaryStreamCopy(groups, new
HashMap<>()));
+
+ // otherwise should return true
+ groups = makeClusteringGroup(file3, file4);
+ Assertions.assertTrue(strategy.supportBinaryStreamCopy(groups, new
HashMap<>()));
+
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+ table = HoodieSparkTable.create(writeConfig, engineContext, metaClient);
+ strategy = new SparkBinaryCopyClusteringExecutionStrategy(table,
engineContext, writeConfig);
+ Assertions.assertFalse(strategy.supportBinaryStreamCopy(groups, new
HashMap<>()));
+ }
+
+ private String makeTestFile(String fileName, Schema schema, MessageType
messageType, BloomFilter filter) throws IOException {
+ HoodieAvroWriteSupport writeSupport = new
HoodieAvroWriteSupport(messageType, schema, Option.of(filter), new
Properties());
+ StoragePath filePath = new
StoragePath(tempDir.resolve(fileName).toAbsolutePath().toString());
+ HoodieConfig hoodieConfig = new HoodieConfig();
+ hoodieConfig.setValue("hoodie.base.path", basePath);
+ HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
+ new HoodieParquetConfig(
+ writeSupport,
+ CompressionCodecName.GZIP,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE,
+ 1024 * 1024 * 1024,
+ storageConf,
+ 0.1,
+ true);
+
+ HoodieAvroParquetWriter writer =
+ new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new
DummyTaskContextSupplier(), true);
+ writer.close();
+ return filePath.toString();
+ }
+
+ private List<ClusteringGroupInfo> makeClusteringGroup(String... files) {
+ List<ClusteringOperation> operations = Arrays.stream(files)
+ .map(file -> {
+ ClusteringOperation op = new ClusteringOperation();
+ op.setDataFilePath(file);
+ return op;
+ })
+ .collect(Collectors.toList());
+ ClusteringGroupInfo group = new ClusteringGroupInfo();
+ group.setNumOutputGroups(1);
+ group.setOperations(operations);
+ return Collections.singletonList(group);
+ }
+
+ static class DummyTaskContextSupplier extends TaskContextSupplier {
Review Comment:
changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]