deniskuzZ commented on code in PR #5174:
URL: https://github.com/apache/hive/pull/5174#discussion_r1553929629


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java:
##########
@@ -154,6 +173,107 @@ private void prepare(InputInitializerContext 
initializerContext) throws IOExcept
     LOG.info("SplitLocationProvider: " + splitLocationProvider);
   }
 
+  /**
+   * SplitSerializer is a helper class for taking care of serializing splits 
to the tez scratch dir
+   * when a size criteria defined by 
"hive.tez.input.fs.serialization.threshold" is met.
+   * It utilizes an ExecutorService for parallel writes to prevent a single 
split write operation
+   * becoming the bottleneck (as write() is called from a loop currently).
+   */
+  class SplitSerializer implements AutoCloseable {
+    private static final String FILE_PATH_FORMAT = 
"%s/events/%s/%d_%s_InputDataInformationEvent_%d";
+
+    // fields needed for filepath
+    private String queryId;
+    private String inputName;
+    private int vertexId;
+    private Path appStagingPath;
+    // filesystem and executor
+    private FileSystem fs;
+    private ExecutorService executor;
+
+    private AtomicBoolean anyTaskFailed;
+    private List<Future<?>> asyncTasks;
+
+    @VisibleForTesting
+    SplitSerializer() throws IOException {
+      if (getContext() == null) {
+        // this typically happens in case the split generation is not in the 
Tez AM
+        // in that case we're not interested in this feature, so it's fine to 
return here and fail later
+        return;
+      }
+      queryId = jobConf.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname);
+      inputName = getContext().getInputName();
+      vertexId = getContext().getVertexId();
+      appStagingPath = TezCommonUtils.getTezSystemStagingPath(conf, 
getContext().getApplicationId().toString());
+
+      fs = appStagingPath.getFileSystem(jobConf);
+
+      int numThreads = HiveConf.getIntVar(jobConf, 
HiveConf.ConfVars.HIVE_TEZ_INPUT_FS_SERIALIZATION_THREADS);
+      executor = Executors.newFixedThreadPool(numThreads,
+          new 
ThreadFactoryBuilder().setNameFormat("HiveSplitGenerator.SplitSerializer Thread 
- " + "#%d").build());
+      anyTaskFailed = new AtomicBoolean(false);
+      asyncTasks = new ArrayList<>();
+    }
+
+    @VisibleForTesting
+    InputDataInformationEvent write(int count, MRSplitProto mrSplit) {
+      InputDataInformationEvent diEvent;
+      Path filePath = getSerializedFilePath(count);
+
+      Runnable task = () -> {
+        if (!anyTaskFailed.get()) {
+          try {
+            writeSplit(count, mrSplit, filePath);
+          } catch (IOException e) {
+            anyTaskFailed.set(true);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+      asyncTasks.add(CompletableFuture.runAsync(task, executor));
+      return InputDataInformationEvent.createWithSerializedPath(count, 
filePath.toString());
+    }
+
+    @VisibleForTesting
+    void writeSplit(int count, MRSplitProto mrSplit, Path filePath) throws 
IOException {
+      long fileWriteStarted = Time.monotonicNow();
+      try (FSDataOutputStream out = fs.create(filePath, false)) {
+        mrSplit.writeTo(out);
+      }
+      LOG.debug("Split #{} event to output path: {} written in {} ms", count, 
filePath,
+          fileWriteStarted - Time.monotonicNow());
+    }
+
+    Path getSerializedFilePath(int index) {
+      // e.g. staging_dir/events/queryid/inputtable_InputDataInformationEvent_0
+      return new Path(String.format(FILE_PATH_FORMAT, appStagingPath, queryId, 
vertexId, inputName, index));
+    }
+
+    @Override
+    public void close() {
+      try {
+        CompletableFuture.allOf(asyncTasks.toArray(new 
CompletableFuture[0])).get();
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted while generating splits", e);
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException e) {// ExecutionException wraps the original 
exception
+        LOG.error("Exception while generating splits", e.getCause());
+        throw new RuntimeException(e.getCause());
+      } finally {
+        if (executor == null) {

Review Comment:
   != null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to