morningman commented on code in PR #34032:
URL: https://github.com/apache/doris/pull/34032#discussion_r1597645580


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java:
##########
@@ -370,9 +375,51 @@ public List<Split> getSplits() throws UserException {
         } catch (InterruptedException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {

Review Comment:
   I think `getSplit()` method should also be in `SplitGenerator` interface?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java:
##########
@@ -370,9 +375,51 @@ public List<Split> getSplits() throws UserException {
         } catch (InterruptedException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
+            return getIncrementalSplits();
+        }
+        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
+        getPartitionSplits(prunedPartitions, splits);
+        return splits;
+    }
+
+    public List<Split> getNextBatch(int maxBatchSize) throws UserException {
+        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
+        int numPartitions = 0;
+        while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext()) 
{
+            List<HivePartition> partitions = new 
ArrayList<>(NUM_PARTITIONS_PER_LOOP);
+            for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && 
prunedPartitionsIter.hasNext(); ++i) {
+                partitions.add(prunedPartitionsIter.next());
+                numPartitions++;
+            }
+            getPartitionSplits(partitions, splits);
+        }
+        if (splits.size() / numPartitions > numSplitsPerPartition) {
+            numSplitsPerPartition = splits.size() / numPartitions;
+        }
         return splits;
     }
 
+    public boolean hasNext() {
+        return prunedPartitionsIter.hasNext();
+    }
+
+    public boolean isBatchMode() {
+        if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
+            return false;
+        }
+        int numPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
+        return numPartitions >= 0 && prunedPartitions.size() >= numPartitions;
+    }
+
+    public int numApproximateSplits() {

Review Comment:
   @Override



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java:
##########
@@ -370,9 +375,51 @@ public List<Split> getSplits() throws UserException {
         } catch (InterruptedException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
+            return getIncrementalSplits();
+        }
+        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
+        getPartitionSplits(prunedPartitions, splits);
+        return splits;
+    }
+
+    public List<Split> getNextBatch(int maxBatchSize) throws UserException {

Review Comment:
   @Override



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java:
##########
@@ -370,9 +375,51 @@ public List<Split> getSplits() throws UserException {
         } catch (InterruptedException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
+            return getIncrementalSplits();
+        }
+        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
+        getPartitionSplits(prunedPartitions, splits);
+        return splits;
+    }
+
+    public List<Split> getNextBatch(int maxBatchSize) throws UserException {
+        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
+        int numPartitions = 0;
+        while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext()) 
{
+            List<HivePartition> partitions = new 
ArrayList<>(NUM_PARTITIONS_PER_LOOP);
+            for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && 
prunedPartitionsIter.hasNext(); ++i) {
+                partitions.add(prunedPartitionsIter.next());
+                numPartitions++;
+            }
+            getPartitionSplits(partitions, splits);
+        }
+        if (splits.size() / numPartitions > numSplitsPerPartition) {
+            numSplitsPerPartition = splits.size() / numPartitions;
+        }
         return splits;
     }
 
+    public boolean hasNext() {
+        return prunedPartitionsIter.hasNext();
+    }
+
+    public boolean isBatchMode() {

Review Comment:
   @Override



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java:
##########
@@ -217,6 +229,46 @@ protected List<Split> getSplits() throws UserException {
         }
     }
 
+    public List<Split> getNextBatch(int maxBatchSize) throws UserException {

Review Comment:
   @Override



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java:
##########
@@ -217,6 +229,46 @@ protected List<Split> getSplits() throws UserException {
         }
     }
 
+    public List<Split> getNextBatch(int maxBatchSize) throws UserException {
+        try {
+            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                    .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
+            String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
+            List<Split> allFiles = Lists.newArrayList();
+            int numPartitions = 0;
+            while (allFiles.size() < maxBatchSize && 
prunedPartitionsIter.hasNext()) {
+                List<HivePartition> partitions = new 
ArrayList<>(NUM_PARTITIONS_PER_LOOP);
+                for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && 
prunedPartitionsIter.hasNext(); ++i) {
+                    partitions.add(prunedPartitionsIter.next());
+                    numPartitions++;
+                }
+                getFileSplitByPartitions(cache, partitions, allFiles, 
bindBrokerName);
+            }
+            if (allFiles.size() / numPartitions > numSplitsPerPartition) {
+                numSplitsPerPartition = allFiles.size() / numPartitions;
+            }
+            return allFiles;
+        } catch (Throwable t) {
+            LOG.warn("get file split failed for table: {}", 
hmsTable.getName(), t);
+            throw new UserException(
+                    "get file split failed for table: " + hmsTable.getName() + 
", err: " + Util.getRootCauseMessage(t),
+                    t);
+        }
+    }
+
+    public boolean hasNext() {
+        return prunedPartitionsIter.hasNext();
+    }
+
+    public boolean isBatchMode() {
+        int numPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();

Review Comment:
   Is it possible that that there are few partitions(<= 1024), but each 
partition has a lot of files?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java:
##########
@@ -370,9 +375,51 @@ public List<Split> getSplits() throws UserException {
         } catch (InterruptedException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
+            return getIncrementalSplits();
+        }
+        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
+        getPartitionSplits(prunedPartitions, splits);
+        return splits;
+    }
+
+    public List<Split> getNextBatch(int maxBatchSize) throws UserException {
+        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
+        int numPartitions = 0;
+        while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext()) 
{
+            List<HivePartition> partitions = new 
ArrayList<>(NUM_PARTITIONS_PER_LOOP);
+            for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && 
prunedPartitionsIter.hasNext(); ++i) {
+                partitions.add(prunedPartitionsIter.next());
+                numPartitions++;
+            }
+            getPartitionSplits(partitions, splits);
+        }
+        if (splits.size() / numPartitions > numSplitsPerPartition) {
+            numSplitsPerPartition = splits.size() / numPartitions;
+        }
         return splits;
     }
 
+    public boolean hasNext() {

Review Comment:
   @Override



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java:
##########
@@ -193,6 +202,26 @@ protected void doInitialize() throws UserException {
         } else {
             incrementalRelation = null;
         }
+
+        timeline = 
hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+        if (desc.getRef().getTableSnapshot() != null) {
+            queryInstant = desc.getRef().getTableSnapshot().getTime();
+            snapshotTimestamp = Option.of(queryInstant);
+        } else {
+            Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
+            if (!snapshotInstant.isPresent()) {
+                prunedPartitions = Collections.emptyList();
+                prunedPartitionsIter = prunedPartitions.iterator();
+                return;
+            }
+            queryInstant = snapshotInstant.get().getTimestamp();
+            snapshotTimestamp = Option.empty();
+        }
+        // Non partition table will get one dummy partition
+        prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
+                HiveMetaStoreClientHelper.getConfiguration(hmsTable),
+                () -> getPrunedPartitions(hudiClient, snapshotTimestamp));
+        prunedPartitionsIter = prunedPartitions.iterator();

Review Comment:
   Before, we do partition prune in `doFinalized()` method, now you move it to 
`doInitialized()` method.
   Are you sure we can do partition prune when initializing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to