This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit eff20567c23cca3d5d6b2d3364e3b95e197be1af Author: Sagar Sumit <[email protected]> AuthorDate: Thu Sep 29 20:59:14 2022 +0530 [MINOR] Use base path URI in ITTestDataStreamWrite (#6826) --- .../java/org/apache/hudi/sink/ITTestDataStreamWrite.java | 16 ++++++++-------- .../src/test/java/org/apache/hudi/utils/TestData.java | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index e6d2ddb7b5..193c0abcd8 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -100,7 +100,7 @@ public class ITTestDataStreamWrite extends TestLogger { @ParameterizedTest @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) public void testWriteCopyOnWrite(String indexType) throws Exception { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1); conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); @@ -146,7 +146,7 @@ public class ITTestDataStreamWrite extends TestLogger { @ParameterizedTest @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) public void testWriteMergeOnReadWithCompaction(String indexType) throws Exception { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4); conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); @@ -167,7 +167,7 @@ public class ITTestDataStreamWrite extends TestLogger { } private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true); conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1); conf.setString(FlinkOptions.OPERATION, "insert"); @@ -182,7 +182,7 @@ public class ITTestDataStreamWrite extends TestLogger { Transformer transformer, String jobName, Map<String, List<String>> expected) throws Exception { - testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), + testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.toURI().toString()), Option.of(transformer), jobName, 2, expected); } @@ -336,7 +336,7 @@ public class ITTestDataStreamWrite extends TestLogger { // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setString(FlinkOptions.TABLE_NAME, "t1"); conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ"); @@ -345,10 +345,10 @@ public class ITTestDataStreamWrite extends TestLogger { TestData.writeData(TestData.dataSetInsert(3, 4), conf); TestData.writeData(TestData.dataSetInsert(5, 6), conf); - String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); + String latestCommit = TestUtils.getLastCompleteInstant(tempFile.toURI().toString()); Map<String, String> options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString()); options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit); //read a hoodie table use low-level source api. @@ -378,7 +378,7 @@ public class ITTestDataStreamWrite extends TestLogger { execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString()); options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString()); Configuration conf = Configuration.fromMap(options); // Read from file source diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index e7e34cc15a..8e1dd9964c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -626,8 +626,8 @@ public class TestData { Map<String, List<String>> expected) throws IOException { // 1. init flink table - HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath()); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build(); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.toURI().toString()); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.toURI().toString()).build(); HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); // 2. check each partition data
