This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 9d2b61b TEZ-4204: Data race in RootInputInitializerManager (Mustafa
Iman via Ashutosh Chauhan)
9d2b61b is described below
commit 9d2b61b576a2421ec4fb813489d896d2b89fcce9
Author: Mustafa Iman <[email protected]>
AuthorDate: Thu Jul 30 12:19:36 2020 +0200
TEZ-4204: Data race in RootInputInitializerManager (Mustafa Iman via
Ashutosh Chauhan)
Signed-off-by: Laszlo Bodor <[email protected]>
---
.../dag/app/dag/RootInputInitializerManager.java | 74 ++++++++++++++++------
1 file changed, 53 insertions(+), 21 deletions(-)
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 5ce0050..9194c1d 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -25,6 +25,7 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -33,6 +34,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@@ -104,36 +107,65 @@ public class RootInputInitializerManager {
}
- public void runInputInitializers(
- List<RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> inputs, List<TezEvent> pendingInitializerEvents) {
-
- executor.submit(() -> createAndStartInitializing(inputs,
pendingInitializerEvents));
+ public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> inputs,
+ List<TezEvent> pendingInitializerEvents) {
+ List<InitializerWrapper> initWrappers = createInitializerWrappers(inputs);
+ if (!initWrappers.isEmpty()) {
+ executor.submit(() ->
createAndStartInitializing(pendingInitializerEvents, initWrappers));
+ }
}
- private void
createAndStartInitializing(List<RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> inputs, List<TezEvent> pendingInitializerEvents) {
+ /**
+ * Create input wrappers for all inputs in parallel.
+ *
+ * @param inputs
+ * @return
+ */
+ private List<InitializerWrapper> createInitializerWrappers(
+ List<RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> inputs) {
String current = null;
+ final List<InitializerWrapper> result = Collections.synchronizedList(new
ArrayList<>());
try {
- List<InitializerWrapper> result = new ArrayList<>();
+ final List<Future<Void>> fResults = new ArrayList<>();
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
each : inputs) {
current = each.getName();
- InitializerWrapper initializer = createInitializerWrapper(each);
- initializerMap.put(each.getName(), initializer);
- registerPendingVertex(each, initializer);
- result.add(initializer);
+ fResults.add(executor.submit(() -> {
+ InitializerWrapper initializer = createInitializerWrapper(each);
+ initializerMap.put(each.getName(), initializer);
+ registerPendingVertex(each, initializer);
+ result.add(initializer);
+ return null;
+ }));
}
- handleInitializerEvents(pendingInitializerEvents);
- pendingInitializerEvents.clear();
- for (InitializerWrapper inputWrapper : result) {
- executor.submit(() -> runInitializerAndProcessResult(inputWrapper));
+ for(Future<Void> f : fResults) {
+ f.get();
}
- } catch (Throwable t) {
- VertexImpl vertexImpl = (VertexImpl) vertex;
- String msg = "Fail to create InputInitializerManager, " +
ExceptionUtils.getStackTrace(t);
- LOG.info(msg);
- vertexImpl.finished(FAILED, VertexTerminationCause.INIT_FAILURE, msg);
- eventHandler.handle(new VertexEventRootInputFailed(vertex.getVertexId(),
current,
- new
AMUserCodeException(AMUserCodeException.Source.InputInitializer, t)));
+ } catch (InterruptedException | ExecutionException t) {
+ failVertex(t, current);
+ }
+ return result;
+ }
+
+ void failVertex(Throwable t, String inputName) {
+ VertexImpl vertexImpl = (VertexImpl) vertex;
+ String msg = "Fail to create InputInitializerManager, " +
ExceptionUtils.getStackTrace(t);
+ LOG.info(msg);
+ vertexImpl.finished(FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+ eventHandler.handle(new VertexEventRootInputFailed(vertex.getVertexId(),
inputName,
+ new AMUserCodeException(AMUserCodeException.Source.InputInitializer,
t)));
+ }
+ /**
+ * Start initializers in parallel.
+ *
+ * @param pendingInitializerEvents
+ * @param result
+ */
+ private void createAndStartInitializing(List<TezEvent>
pendingInitializerEvents, List<InitializerWrapper> result) {
+ handleInitializerEvents(pendingInitializerEvents);
+ pendingInitializerEvents.clear();
+ for (InitializerWrapper inputWrapper : result) {
+ executor.submit(() -> runInitializerAndProcessResult(inputWrapper));
}
}