deniskuzZ commented on code in PR #5174:
URL: https://github.com/apache/hive/pull/5174#discussion_r1553929629
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java:
##########
@@ -154,6 +173,107 @@ private void prepare(InputInitializerContext
initializerContext) throws IOExcept
LOG.info("SplitLocationProvider: " + splitLocationProvider);
}
+ /**
+ * SplitSerializer is a helper class for taking care of serializing splits
to the tez scratch dir
+ * when a size criteria defined by
"hive.tez.input.fs.serialization.threshold" is met.
+ * It utilizes an ExecutorService for parallel writes to prevent a single
split write operation
+ * becoming the bottleneck (as write() is called from a loop currently).
+ */
+ class SplitSerializer implements AutoCloseable {
+ private static final String FILE_PATH_FORMAT =
"%s/events/%s/%d_%s_InputDataInformationEvent_%d";
+
+ // fields needed for filepath
+ private String queryId;
+ private String inputName;
+ private int vertexId;
+ private Path appStagingPath;
+ // filesystem and executor
+ private FileSystem fs;
+ private ExecutorService executor;
+
+ private AtomicBoolean anyTaskFailed;
+ private List<Future<?>> asyncTasks;
+
+ @VisibleForTesting
+ SplitSerializer() throws IOException {
+ if (getContext() == null) {
+ // this typically happens in case the split generation is not in the
Tez AM
+ // in that case we're not interested in this feature, so it's fine to
return here and fail later
+ return;
+ }
+ queryId = jobConf.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname);
+ inputName = getContext().getInputName();
+ vertexId = getContext().getVertexId();
+ appStagingPath = TezCommonUtils.getTezSystemStagingPath(conf,
getContext().getApplicationId().toString());
+
+ fs = appStagingPath.getFileSystem(jobConf);
+
+ int numThreads = HiveConf.getIntVar(jobConf,
HiveConf.ConfVars.HIVE_TEZ_INPUT_FS_SERIALIZATION_THREADS);
+ executor = Executors.newFixedThreadPool(numThreads,
+ new
ThreadFactoryBuilder().setNameFormat("HiveSplitGenerator.SplitSerializer Thread
- " + "#%d").build());
+ anyTaskFailed = new AtomicBoolean(false);
+ asyncTasks = new ArrayList<>();
+ }
+
+ @VisibleForTesting
+ InputDataInformationEvent write(int count, MRSplitProto mrSplit) {
+ InputDataInformationEvent diEvent;
+ Path filePath = getSerializedFilePath(count);
+
+ Runnable task = () -> {
+ if (!anyTaskFailed.get()) {
+ try {
+ writeSplit(count, mrSplit, filePath);
+ } catch (IOException e) {
+ anyTaskFailed.set(true);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ asyncTasks.add(CompletableFuture.runAsync(task, executor));
+ return InputDataInformationEvent.createWithSerializedPath(count,
filePath.toString());
+ }
+
+ @VisibleForTesting
+ void writeSplit(int count, MRSplitProto mrSplit, Path filePath) throws
IOException {
+ long fileWriteStarted = Time.monotonicNow();
+ try (FSDataOutputStream out = fs.create(filePath, false)) {
+ mrSplit.writeTo(out);
+ }
+ LOG.debug("Split #{} event to output path: {} written in {} ms", count,
filePath,
+ fileWriteStarted - Time.monotonicNow());
+ }
+
+ Path getSerializedFilePath(int index) {
+ // e.g. staging_dir/events/queryid/inputtable_InputDataInformationEvent_0
+ return new Path(String.format(FILE_PATH_FORMAT, appStagingPath, queryId,
vertexId, inputName, index));
+ }
+
+ @Override
+ public void close() {
+ try {
+ CompletableFuture.allOf(asyncTasks.toArray(new
CompletableFuture[0])).get();
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while generating splits", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {// ExecutionException wraps the original
exception
+ LOG.error("Exception while generating splits", e.getCause());
+ throw new RuntimeException(e.getCause());
+ } finally {
+ if (executor == null) {
Review Comment:
!= null
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]