danny0405 commented on a change in pull request #3390:
URL: https://github.com/apache/hudi/pull/3390#discussion_r682250961
##########
File path: hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
##########
@@ -394,6 +394,48 @@ public static void checkWrittenData(
}
}
+ /**
+ * Checks the source data set are written as expected.
+ * Different with {@link #checkWrittenData}, it reads all the data files.
+ *
+ * <p>Note: Replace it with the Flink reader when it is supported.
+ *
+ * @param baseFile The file base to check, should be a directory
+ * @param expected The expected results mapping, the key should be the
partition path
+ * and value should be values list with the key partition
+ * @param partitions The expected partition number
+ */
+ public static void checkWrittenAllData(
+ File baseFile,
+ Map<String, String> expected,
+ int partitions) throws IOException {
+ assert baseFile.isDirectory();
+ FileFilter filter = file -> !file.getName().startsWith(".");
+ File[] partitionDirs = baseFile.listFiles(filter);
+
+ assertNotNull(partitionDirs);
+ assertThat(partitionDirs.length, is(partitions));
+
+ for (File partitionDir : partitionDirs) {
+ File[] dataFiles = partitionDir.listFiles(filter);
+ assertNotNull(dataFiles);
+
+ List<String> readBuffer = new ArrayList<>();
+ for (File dataFile : dataFiles) {
+ ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(new
Path(dataFile.getAbsolutePath())).build();
+ GenericRecord nextRecord = reader.read();
+ while (nextRecord != null) {
+ readBuffer.add(filterOutVariables(nextRecord));
+ nextRecord = reader.read();
+ }
+ readBuffer.sort(Comparator.naturalOrder());
+ }
Review comment:
`readBuffer.sort(Comparator.naturalOrder());` can be moved out of the
for loop i guess.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -117,21 +115,25 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig =
StreamerUtil.getHoodieClientConfig(this.conf);
- this.hadoopConf = StreamerUtil.getHadoopConf();
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
- new SerializableConfiguration(this.hadoopConf),
+ new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create(
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getMaxNumberOfParallelSubtasks(),
getRuntimeContext().getNumberOfParallelSubtasks(),
-
WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
+ ignoreSmallFiles(),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
this.payloadCreation = PayloadCreation.instance(this.conf);
}
+ private boolean ignoreSmallFiles() {
+ WriteOperationType operationType =
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+ return WriteOperationType.isOverwrite(operationType) ||
StreamerUtil.allowDuplicateInserts(conf);
+ }
Review comment:
`StreamerUtil.allowDuplicateInserts(conf);` can be replaced by
`writeConfig.allowDuplicateInserts`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]