danny0405 commented on code in PR #13365:
URL: https://github.com/apache/hudi/pull/13365#discussion_r2146292275


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkBinaryCopyClusteringAndValidationMeta.java:
##########
@@ -192,4 +219,115 @@ private void checkFileFooter(List<String> clusteredFiles, 
List<HoodieRecord> all
     assertTrue(rows.select("_hoodie_file_name").collectAsList().stream()
         .allMatch(t -> t.getAs(0).equals(clusteredFileName)));
   }
+
+  @Test
+  public void testSupportBinaryStreamCopy() throws IOException {
+    HoodieWriteConfig writeConfig = new HoodieWriteConfig.Builder()
+        .withPath(basePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withEmbeddedTimelineServerEnabled(false)
+        .build();
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext, 
metaClient);
+    SparkBinaryCopyClusteringExecutionStrategy strategy = new 
SparkBinaryCopyClusteringExecutionStrategy(table, engineContext, writeConfig);
+
+    MessageType legacySchema = new AvroSchemaConverter().convert(AVRO_SCHEMA);
+    Configuration conf = new Configuration();
+    conf.set("parquet.avro.write-old-list-structure", "false");
+    MessageType standardSchema = new 
AvroSchemaConverter(conf).convert(AVRO_SCHEMA);
+
+    BloomFilter simpleBloomFilter = BloomFilterFactory.createBloomFilter(1000, 
0.0001, 10000, SIMPLE.name());
+    BloomFilter dynmicBloomFilter = BloomFilterFactory.createBloomFilter(1000, 
0.0001, 10000, DYNAMIC_V0.name());
+    String file1 = makeTestFile("file-1.parquet", AVRO_SCHEMA, legacySchema, 
simpleBloomFilter);
+    String file2 = makeTestFile("file-2.parquet", AVRO_SCHEMA, legacySchema, 
dynmicBloomFilter);
+    String file3 = makeTestFile("file-3.parquet", AVRO_SCHEMA, standardSchema, 
dynmicBloomFilter);
+    String file4 = makeTestFile("file-4.parquet", AVRO_SCHEMA, standardSchema, 
dynmicBloomFilter);
+
+    // input file contains multiple bloom filter code type, should return false
+    List<ClusteringGroupInfo> groups = makeClusteringGroup(file1, file2);
+    Assertions.assertFalse(strategy.supportBinaryStreamCopy(groups, new 
HashMap<>()));
+
+    // input file contains legacy schema, should return false
+    groups = makeClusteringGroup(file2, file3);
+    Assertions.assertFalse(strategy.supportBinaryStreamCopy(groups, new 
HashMap<>()));
+
+    // otherwise should return true
+    groups = makeClusteringGroup(file3, file4);
+    Assertions.assertTrue(strategy.supportBinaryStreamCopy(groups, new 
HashMap<>()));
+
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+    table = HoodieSparkTable.create(writeConfig, engineContext, metaClient);
+    strategy = new SparkBinaryCopyClusteringExecutionStrategy(table, 
engineContext, writeConfig);
+    Assertions.assertFalse(strategy.supportBinaryStreamCopy(groups, new 
HashMap<>()));
+  }
+
+  private String makeTestFile(String fileName, Schema schema, MessageType 
messageType, BloomFilter filter) throws IOException {
+    HoodieAvroWriteSupport writeSupport = new 
HoodieAvroWriteSupport(messageType, schema, Option.of(filter), new 
Properties());
+    StoragePath filePath = new 
StoragePath(tempDir.resolve(fileName).toAbsolutePath().toString());
+    HoodieConfig hoodieConfig = new HoodieConfig();
+    hoodieConfig.setValue("hoodie.base.path", basePath);
+    HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
+        new HoodieParquetConfig(
+            writeSupport,
+            CompressionCodecName.GZIP,
+            ParquetWriter.DEFAULT_BLOCK_SIZE,
+            ParquetWriter.DEFAULT_PAGE_SIZE,
+            1024 * 1024 * 1024,
+            storageConf,
+            0.1,
+            true);
+
+    HoodieAvroParquetWriter writer =
+        new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new 
DummyTaskContextSupplier(), true);
+    writer.close();
+    return filePath.toString();
+  }
+
+  private List<ClusteringGroupInfo> makeClusteringGroup(String... files) {
+    List<ClusteringOperation> operations = Arrays.stream(files)
+        .map(file -> {
+          ClusteringOperation op = new ClusteringOperation();
+          op.setDataFilePath(file);
+          return op;
+        })
+        .collect(Collectors.toList());
+    ClusteringGroupInfo group = new ClusteringGroupInfo();
+    group.setNumOutputGroups(1);
+    group.setOperations(operations);
+    return Collections.singletonList(group);
+  }
+
+  static class DummyTaskContextSupplier extends TaskContextSupplier {

Review Comment:
   we have `LocalTaskContextSupplier` for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to