aokolnychyi commented on code in PR #8123:
URL: https://github.com/apache/iceberg/pull/8123#discussion_r1309282075


##########
core/src/main/java/org/apache/iceberg/DistributedDataBatchScan.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.TableScanUtil;
+
+abstract class DistributedDataBatchScan
+    extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
+
+  private static final int NUM_LOCAL_CORES = 
Runtime.getRuntime().availableProcessors();
+  private static final long METADATA_LIMIT_PER_LOCAL_CORE = 128 * 1024 * 1024; 
// 128 MB
+  private static final int NUM_WORKER_THREADS = 2;
+
+  protected DistributedDataBatchScan(Table table, Schema schema, 
TableScanContext context) {
+    super(table, schema, context);
+  }
+
+  protected abstract int remoteParallelism();
+
+  protected abstract PlanningMode dataPlanningMode();
+
+  protected abstract List<DataFile> planDataRemotely(List<ManifestFile> 
manifests);
+
+  protected abstract PlanningMode deletePlanningMode();
+
+  protected abstract List<DeleteFile> planDeletesRemotely(List<ManifestFile> 
manifests);
+
+  protected abstract void releaseResources();
+
+  @Override
+  protected boolean useSnapshotSchema() {
+    return true;
+  }
+
+  @Override
+  protected CloseableIterable<ScanTask> doPlanFiles() {
+    Snapshot snapshot = snapshot();
+
+    List<ManifestFile> dataManifests = findMatchingDataManifests(snapshot);
+    List<ManifestFile> deleteManifests = findMatchingDeleteManifests(snapshot);
+
+    boolean planDataRemotely = shouldPlanRemotely(dataPlanningMode(), 
dataManifests);
+    boolean planDeletesRemotely = shouldPlanRemotely(deletePlanningMode(), 
deleteManifests);
+
+    if (planDataRemotely || planDeletesRemotely) {
+      ExecutorService workerPool = newWorkerPool();
+
+      CompletableFuture<CloseableIterable<DataFile>> dataFilesFuture =
+          newDataFilesFuture(dataManifests, planDataRemotely, workerPool);
+
+      CompletableFuture<DeleteFileIndex> deletesFuture =
+          newDeletesFuture(deleteManifests, planDeletesRemotely, workerPool);
+
+      try {
+        CloseableIterable<DataFile> dataFiles = dataFilesFuture.join();
+        DeleteFileIndex deletes = deletesFuture.join();
+        return createTasks(dataFiles, deletes);
+
+      } catch (CompletionException e) {
+        dataFilesFuture.cancel(true /* may interrupt */);
+        deletesFuture.cancel(true /* may interrupt */);
+        throw new RuntimeException("Failed to plan files", e);
+
+      } finally {
+        workerPool.shutdown();
+        releaseResources();
+      }
+
+    } else {
+      return planFilesLocally(dataManifests, deleteManifests);
+    }
+  }
+
+  private CompletableFuture<CloseableIterable<DataFile>> newDataFilesFuture(
+      List<ManifestFile> manifests, boolean planRemotely, ExecutorService 
workerPool) {
+
+    return CompletableFuture.supplyAsync(
+        () -> {
+          if (planRemotely) {
+            scanMetrics().scannedDataManifests().increment(manifests.size());
+            List<DataFile> dataFiles = planDataRemotely(manifests);
+            int skippedDataFilesCount = totalFilesCount(manifests) - 
dataFiles.size();
+            scanMetrics().skippedDataFiles().increment(skippedDataFilesCount);
+            return CloseableIterable.withNoopClose(dataFiles);
+          } else {
+            ManifestGroup manifestGroup = newManifestGroup(manifests, 
ImmutableList.of());
+            return CloseableIterable.transform(manifestGroup.entries(), 
ManifestEntry::file);
+          }
+        },
+        workerPool);
+  }
+
+  private CompletableFuture<DeleteFileIndex> newDeletesFuture(
+      List<ManifestFile> manifests, boolean planRemotely, ExecutorService 
workerPool) {
+
+    return CompletableFuture.supplyAsync(
+        () -> {
+          DeleteFileIndex.Builder builder;
+
+          if (planRemotely) {
+            scanMetrics().scannedDeleteManifests().increment(manifests.size());
+            List<DeleteFile> deleteFiles = planDeletesRemotely(manifests);
+            int skippedDeleteFilesCount = totalFilesCount(manifests) - 
deleteFiles.size();
+            
scanMetrics().skippedDeleteFiles().increment(skippedDeleteFilesCount);
+            builder = DeleteFileIndex.builderForFiles(io(), deleteFiles);
+          } else {
+            builder = DeleteFileIndex.builderFor(io(), manifests);
+          }
+
+          return builder
+              .specsById(table().specs())
+              .filterData(filter())
+              .caseSensitive(isCaseSensitive())
+              .planWith(planExecutor())
+              .scanMetrics(scanMetrics())
+              .build();
+        },
+        workerPool);
+  }
+
+  private CloseableIterable<ScanTask> planFilesLocally(
+      List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) {
+    ManifestGroup manifestGroup = newManifestGroup(dataManifests, 
deleteManifests);
+    return asScanTasks(manifestGroup.planFiles());
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private CloseableIterable<ScanTask> createTasks(
+      CloseableIterable<DataFile> dataFiles, DeleteFileIndex deletes) {
+
+    String schemaString = SchemaParser.toJson(tableSchema());
+    LoadingCache<Integer, String> specStringCache = 
specCache(PartitionSpecParser::toJson);
+    LoadingCache<Integer, ResidualEvaluator> residualCache = 
specCache(this::newResidualEvaluator);
+
+    return CloseableIterable.transform(
+        dataFiles,
+        dataFile -> {
+          DeleteFile[] deleteFiles = deletes.forDataFile(dataFile);
+
+          
scanMetrics().totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
+          scanMetrics().resultDataFiles().increment();
+          scanMetrics().resultDeleteFiles().increment(deleteFiles.length);
+
+          for (DeleteFile deleteFile : deleteFiles) {
+            
scanMetrics().totalDeleteFileSizeInBytes().increment(deleteFile.fileSizeInBytes());
+          }
+
+          String specString = specStringCache.get(dataFile.specId());
+          ResidualEvaluator residuals = residualCache.get(dataFile.specId());
+
+          return new BaseFileScanTask(dataFile, deleteFiles, schemaString, 
specString, residuals);
+        });
+  }
+
+  private List<ManifestFile> findMatchingDataManifests(Snapshot snapshot) {
+    List<ManifestFile> dataManifests = snapshot.dataManifests(io());
+    scanMetrics().totalDataManifests().increment(dataManifests.size());
+
+    List<ManifestFile> matchingDataManifests = filterManifests(dataManifests);
+    int skippedDataManifestsCount = dataManifests.size() - 
matchingDataManifests.size();
+    scanMetrics().skippedDataManifests().increment(skippedDataManifestsCount);
+
+    return matchingDataManifests;
+  }
+
+  private List<ManifestFile> findMatchingDeleteManifests(Snapshot snapshot) {
+    List<ManifestFile> deleteManifests = snapshot.deleteManifests(io());
+    scanMetrics().totalDeleteManifests().increment(deleteManifests.size());
+
+    List<ManifestFile> matchingDeleteManifests = 
filterManifests(deleteManifests);
+    int skippedDeleteManifestsCount = deleteManifests.size() - 
matchingDeleteManifests.size();
+    
scanMetrics().skippedDeleteManifests().increment(skippedDeleteManifestsCount);
+
+    return matchingDeleteManifests;
+  }
+
+  private List<ManifestFile> filterManifests(List<ManifestFile> manifests) {
+    LoadingCache<Integer, ManifestEvaluator> evalCache = 
specCache(this::newManifestEvaluator);
+
+    return manifests.stream()
+        .filter(manifest -> manifest.hasAddedFiles() || 
manifest.hasExistingFiles())
+        .filter(manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest))
+        .collect(Collectors.toList());
+  }
+
+  private boolean shouldPlanRemotely(PlanningMode mode, List<ManifestFile> 
manifests) {
+    switch (mode) {
+      case LOCAL:

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to