This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a60266142dd [MINOR] Cosmetic changes for names and log msgs (#11179)
a60266142dd is described below
commit a60266142dd4ab2c45f9ba1222a9ff17e806a8c3
Author: Danny Chan <[email protected]>
AuthorDate: Thu May 9 16:12:21 2024 +0800
[MINOR] Cosmetic changes for names and log msgs (#11179)
---
.../apache/hudi/index/bucket/HoodieSimpleBucketIndex.java | 2 +-
.../hudi/common/table/view/HoodieTableFileSystemView.java | 9 ++++-----
.../partitioner/StreamReadAppendPartitioner.java | 13 ++++++++-----
.../partitioner/StreamReadBucketIndexPartitioner.java | 13 ++++++++-----
.../selector/StreamReadAppendKeySelector.java | 2 +-
.../selector/StreamReadBucketIndexKeySelector.java | 2 +-
.../main/java/org/apache/hudi/table/HoodieTableSource.java | 8 ++++----
7 files changed, 27 insertions(+), 22 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index 4bbe12675fe..3cde2a23139 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -96,7 +96,7 @@ public class HoodieSimpleBucketIndex extends
HoodieBucketIndex {
}
/**
- * Find out the conflict files in bucket partition with bucket id.
+ * Find out the conflict instants with given partition and bucket id.
*/
public List<String> findConflictInstantsInPartition(HoodieTable hoodieTable,
String partition, int bucketId, Set<String> pendingInstants) {
List<String> instants = new ArrayList<>();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index 819ca2c3280..d132b19c3de 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -307,13 +307,12 @@ public class HoodieTableFileSystemView extends
IncrementalTimelineSyncFileSystem
*/
@Override
Stream<HoodieFileGroup> fetchAllStoredFileGroups(String partition) {
- List<HoodieFileGroup> hoodieFileGroups =
partitionToFileGroupsMap.get(partition);
- if (hoodieFileGroups == null || hoodieFileGroups.size() == 0) {
- LOG.warn("partition: {} is not available in store");
+ List<HoodieFileGroup> fileGroups = partitionToFileGroupsMap.get(partition);
+ if (fileGroups == null || fileGroups.isEmpty()) {
+ LOG.warn("Partition: {} is not available in store", partition);
return Stream.empty();
}
- final List<HoodieFileGroup> fileGroups = new
ArrayList<>(partitionToFileGroupsMap.get(partition));
- return fileGroups.stream();
+ return new ArrayList<>(partitionToFileGroupsMap.get(partition)).stream();
}
public Stream<HoodieFileGroup> getAllFileGroups() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/partitioner/StreamReadAppendPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadAppendPartitioner.java
similarity index 79%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/partitioner/StreamReadAppendPartitioner.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadAppendPartitioner.java
index 67bd9f9e324..3a6ae09ad58 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/partitioner/StreamReadAppendPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadAppendPartitioner.java
@@ -16,20 +16,23 @@
* limitations under the License.
*/
-package org.apache.hudi.source.filedistribution.partitioner;
+package org.apache.hudi.source.rebalance.partitioner;
import org.apache.flink.api.common.functions.Partitioner;
+/**
+ * Partitioner for regular streaming read.
+ */
public class StreamReadAppendPartitioner implements Partitioner<Integer> {
- private final int parallNum;
+ private final int parallelism;
- public StreamReadAppendPartitioner(int parallNum) {
- this.parallNum = parallNum;
+ public StreamReadAppendPartitioner(int parallelism) {
+ this.parallelism = parallelism;
}
@Override
public int partition(Integer splitNum, int maxParallelism) {
- return splitNum % parallNum;
+ return splitNum % parallelism;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/partitioner/StreamReadBucketIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
similarity index 82%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/partitioner/StreamReadBucketIndexPartitioner.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
index 4b5531b67ba..59971c615cd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/partitioner/StreamReadBucketIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
@@ -16,22 +16,25 @@
* limitations under the License.
*/
-package org.apache.hudi.source.filedistribution.partitioner;
+package org.apache.hudi.source.rebalance.partitioner;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.flink.api.common.functions.Partitioner;
+/**
+ * Partitioner for table with bucket index type.
+ */
public class StreamReadBucketIndexPartitioner implements Partitioner<String> {
- private final int parallNum;
+ private final int parallelism;
- public StreamReadBucketIndexPartitioner(int parallNum) {
- this.parallNum = parallNum;
+ public StreamReadBucketIndexPartitioner(int parallelism) {
+ this.parallelism = parallelism;
}
@Override
public int partition(String fileName, int maxParallelism) {
- return BucketIdentifier.bucketIdFromFileId(fileName) % parallNum;
+ return BucketIdentifier.bucketIdFromFileId(fileName) % parallelism;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/selector/StreamReadAppendKeySelector.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadAppendKeySelector.java
similarity index 95%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/selector/StreamReadAppendKeySelector.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadAppendKeySelector.java
index de4a5f85f9c..6b7588918a0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/selector/StreamReadAppendKeySelector.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadAppendKeySelector.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.source.filedistribution.selector;
+package org.apache.hudi.source.rebalance.selector;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/selector/StreamReadBucketIndexKeySelector.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
similarity index 95%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/selector/StreamReadBucketIndexKeySelector.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
index d1db6559659..bfcb56a0d1d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/filedistribution/selector/StreamReadBucketIndexKeySelector.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.source.filedistribution.selector;
+package org.apache.hudi.source.rebalance.selector;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index f955a879c5f..49f41d7fcad 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -46,10 +46,10 @@ import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
-import
org.apache.hudi.source.filedistribution.partitioner.StreamReadAppendPartitioner;
-import
org.apache.hudi.source.filedistribution.partitioner.StreamReadBucketIndexPartitioner;
-import
org.apache.hudi.source.filedistribution.selector.StreamReadAppendKeySelector;
-import
org.apache.hudi.source.filedistribution.selector.StreamReadBucketIndexKeySelector;
+import
org.apache.hudi.source.rebalance.partitioner.StreamReadAppendPartitioner;
+import
org.apache.hudi.source.rebalance.partitioner.StreamReadBucketIndexPartitioner;
+import org.apache.hudi.source.rebalance.selector.StreamReadAppendKeySelector;
+import
org.apache.hudi.source.rebalance.selector.StreamReadBucketIndexKeySelector;
import org.apache.hudi.source.prune.DataPruner;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.prune.PrimaryKeyPruners;