This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b9e99ead [#965] feat(tez): support remote shuffle for tez framework 
(#966)
b9e99ead is described below

commit b9e99ead382be19b115f658ddd99b923554f6ad1
Author: zhengchenyu <[email protected]>
AuthorDate: Thu Jun 29 17:06:49 2023 +0800

    [#965] feat(tez): support remote shuffle for tez framework (#966)
    
    ### What changes were proposed in this pull request?
    
    A method to support remote shuffle for tez framework.
    
    ### How was this patch tested?
    
    test in real cluster and integration test.
---
 .../java/org/apache/tez/common/RssTezUtils.java    |  68 +++++++
 .../org/apache/tez/dag/app/RssDAGAppMaster.java    | 112 +++++++++--
 .../apache/tez/dag/app/RssDAGAppMasterTest.java    | 219 +++++++++++++++++++++
 client-tez/src/test/resources/log4j.properties     |  22 +++
 4 files changed, 407 insertions(+), 14 deletions(-)

diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java 
b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index ed4df3e8..c980ddb8 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -27,6 +27,24 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
+import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
+import org.apache.tez.runtime.library.input.RssConcatenatedMergedKeyValueInput;
+import 
org.apache.tez.runtime.library.input.RssConcatenatedMergedKeyValuesInput;
+import org.apache.tez.runtime.library.input.RssOrderedGroupedInputLegacy;
+import org.apache.tez.runtime.library.input.RssOrderedGroupedKVInput;
+import org.apache.tez.runtime.library.input.RssOrderedGroupedMergedKVInput;
+import org.apache.tez.runtime.library.input.RssUnorderedKVInput;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.RssOrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.RssUnorderedKVOutput;
+import org.apache.tez.runtime.library.output.RssUnorderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -325,4 +343,54 @@ public class RssTezUtils {
       }
     }
   }
+
+  public static String replaceRssOutputClassName(String className) {
+    if (className.equals(OrderedPartitionedKVOutput.class.getName())) {
+      LOG.info("Output class name will transient from {} to {}", className,
+          RssOrderedPartitionedKVOutput.class.getName());
+      return RssOrderedPartitionedKVOutput.class.getName();
+    } else if (className.equals(UnorderedKVOutput.class.getName())) {
+      LOG.info("Output class name will transient from {} to {}", className,
+          RssUnorderedKVOutput.class.getName());
+      return RssUnorderedKVOutput.class.getName();
+    } else if (className.equals(UnorderedPartitionedKVOutput.class.getName())) 
{
+      LOG.info("Output class name will transient from {} to {}", className,
+          RssUnorderedPartitionedKVOutput.class.getName());
+      return RssUnorderedPartitionedKVOutput.class.getName();
+    } else {
+      LOG.info("Unexpected kv output class name {}.", className);
+      return className;
+    }
+  }
+
+  public static String replaceRssInputClassName(String className) {
+    if (className.equals(OrderedGroupedKVInput.class.getName())) {
+      LOG.info("Input class name will transient from {} to {}", className,
+          RssOrderedGroupedKVInput.class.getName());
+      return RssOrderedGroupedKVInput.class.getName();
+    } else if (className.equals(OrderedGroupedMergedKVInput.class.getName())) {
+      LOG.info("Input class name will transient from {} to {}", className,
+          RssOrderedGroupedMergedKVInput.class.getName());
+      return RssOrderedGroupedMergedKVInput.class.getName();
+    } else if (className.equals(OrderedGroupedInputLegacy.class.getName())) {
+      LOG.info("Input class name will transient from {} to {}", className,
+          RssOrderedGroupedInputLegacy.class.getName());
+      return RssOrderedGroupedInputLegacy.class.getName();
+    } else if (className.equals(UnorderedKVInput.class.getName())) {
+      LOG.info("Input class name will transient from {} to {}", className,
+          RssUnorderedKVInput.class.getName());
+      return RssUnorderedKVInput.class.getName();
+    } else if 
(className.equals(ConcatenatedMergedKeyValueInput.class.getName())) {
+      LOG.info("Input class name will transient from {} to {}", className,
+          RssConcatenatedMergedKeyValueInput.class.getName());
+      return RssConcatenatedMergedKeyValueInput.class.getName();
+    } else if 
(className.equals(ConcatenatedMergedKeyValuesInput.class.getName())) {
+      LOG.info("Input class name will transient from {} to {}", className,
+          RssConcatenatedMergedKeyValuesInput.class.getName());
+      return RssConcatenatedMergedKeyValuesInput.class.getName();
+    } else {
+      LOG.info("Unexpected kv input class name {}.", className);
+      return className;
+    }
+  }
 }
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 16086897..cb6146bf 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -18,6 +18,7 @@
 package org.apache.tez.dag.app;
 
 import java.lang.reflect.Field;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -26,9 +27,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Options;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -48,18 +47,30 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.helpers.Loader;
+import org.apache.log4j.helpers.OptionConverter;
 import org.apache.tez.common.RssTezConfig;
 import org.apache.tez.common.RssTezUtils;
 import org.apache.tez.common.TezClassLoader;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.VersionInfo;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +78,8 @@ import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ThreadUtils;
 
+import static org.apache.log4j.LogManager.CONFIGURATOR_CLASS_KEY;
+import static org.apache.log4j.LogManager.DEFAULT_CONFIGURATION_KEY;
 import static org.apache.tez.common.TezCommonUtils.TEZ_SYSTEM_SUB_DIR;
 
 public class RssDAGAppMaster extends DAGAppMaster {
@@ -158,6 +171,13 @@ public class RssDAGAppMaster extends DAGAppMaster {
     mayCloseTezSlowStart(conf);
   }
 
+  @Override
+  protected DAG createDAG(DAGProtos.DAGPlan dagPB) {
+    DAGImpl dag = createDAG(dagPB, null);
+    registerStateEnteredCallback(dag);
+    return dag;
+  }
+
   @Override
   public String submitDAGToAppMaster(DAGProtos.DAGPlan dagPlan, Map<String, 
LocalResource> additionalResources)
           throws TezException {
@@ -206,6 +226,24 @@ public class RssDAGAppMaster extends DAGAppMaster {
    */
   public static void main(String[] args) {
     try {
+      // We use trick way to introduce RssDAGAppMaster by the config 
tez.am.launch.cmd-opts.
+      // It means some property which is set by command line will be ingored, 
so we must reload it.
+      boolean sessionModeCliOption = false;
+      for (int i = 0; i < args.length; i++) {
+        if (args[i].startsWith("-D")) {
+          String[] property = args[i].split("=");
+          if (property.length < 2) {
+            System.setProperty(property[0].substring(2), "");
+          } else {
+            System.setProperty(property[0].substring(2), property[1]);
+          }
+        } else if (args[i].contains("--session") || args[i].contains("-s")) {
+          sessionModeCliOption = true;
+        }
+      }
+      // Load the log4j config is only init in static code block of 
LogManager, so we must reconfigure.
+      reconfigureLog4j();
+
       // Install the tez class loader, which can be used add new resources
       TezClassLoader.setupTezClassLoader();
       Thread.setDefaultUncaughtExceptionHandler(new 
YarnUncaughtExceptionHandler());
@@ -228,15 +266,7 @@ public class RssDAGAppMaster extends DAGAppMaster {
       String jobUserName = System
               .getenv(ApplicationConstants.Environment.USER.name());
 
-      // Command line options
-      Options opts = new Options();
-      opts.addOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION,
-              false, "Run Tez Application Master in Session mode");
-
-      CommandLine cliParser = new GnuParser().parse(opts, args);
-      boolean sessionModeCliOption = 
cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
-
-      LOG.info("Creating DAGAppMaster for "
+      LOG.info("Creating RssDAGAppMaster for "
               + "applicationId=" + applicationAttemptId.getApplicationId()
               + ", attemptNum=" + applicationAttemptId.getAttemptId()
               + ", AMContainerId=" + containerId
@@ -294,7 +324,7 @@ public class RssDAGAppMaster extends DAGAppMaster {
       initAndStartRSSClient(appMaster, conf, applicationAttemptId);
       initAndStartAppMaster(appMaster, conf);
     } catch (Throwable t) {
-      LOG.error("Error starting DAGAppMaster", t);
+      LOG.error("Error starting RssDAGAppMaster", t);
       System.exit(1);
     }
   }
@@ -358,8 +388,62 @@ public class RssDAGAppMaster extends DAGAppMaster {
         }
       }
 
-      RssDAGAppMaster.LOG.info("MRAppMaster received a signal. Signaling 
RMCommunicator and JobHistoryEventHandler.");
+      RssDAGAppMaster.LOG.info(
+          "RssDAGAppMaster received a signal. Signaling RMCommunicator and 
JobHistoryEventHandler.");
       this.appMaster.stop();
     }
   }
+
+  @VisibleForTesting
+  public static void registerStateEnteredCallback(DAGImpl dag) {
+    StateMachineTez
+        stateMachine = (StateMachineTez) getPrivateField(dag, "stateMachine");
+    stateMachine.registerStateEnteredCallback(DAGState.INITED, new 
DagInitialCallback());
+  }
+
+  static class DagInitialCallback implements OnStateChangedCallback<DAGState, 
DAGImpl> {
+
+    @Override
+    public void onStateChanged(DAGImpl dag, DAGState dagState) {
+      try {
+        Map<String, Edge> edges = (Map<String, Edge>) getPrivateField(dag, 
"edges");
+        for (Map.Entry<String, Edge> entry : edges.entrySet()) {
+          Edge edge = entry.getValue();
+
+          OutputDescriptor outputDescriptor = 
edge.getEdgeProperty().getEdgeSource();
+          Field outputClassNameField = 
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+          outputClassNameField.setAccessible(true);
+          String outputClassName = (String) 
outputClassNameField.get(outputDescriptor);
+          String rssOutputClassName = 
RssTezUtils.replaceRssOutputClassName(outputClassName);
+          outputClassNameField.set(outputDescriptor, rssOutputClassName);
+
+          InputDescriptor inputDescriptor = 
edge.getEdgeProperty().getEdgeDestination();
+          Field inputClassNameField = 
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+          inputClassNameField.setAccessible(true);
+          String inputClassName = (String) 
outputClassNameField.get(inputDescriptor);
+          String rssInputClassName = 
RssTezUtils.replaceRssInputClassName(inputClassName);
+          outputClassNameField.set(inputDescriptor, rssInputClassName);
+        }
+      } catch (Exception e) {
+        throw new TezUncheckedException(e);
+      }
+    }
+  }
+
+  private static Object getPrivateField(Object object, String name) {
+    try {
+      Field f = object.getClass().getDeclaredField(name);
+      f.setAccessible(true);
+      return f.get(object);
+    } catch (Exception e) {
+      throw new RssException(e);
+    }
+  }
+
+  private static void reconfigureLog4j() {
+    String configuratorClassName = 
OptionConverter.getSystemProperty(CONFIGURATOR_CLASS_KEY, null);
+    String configurationOptionStr = 
OptionConverter.getSystemProperty(DEFAULT_CONFIGURATION_KEY, null);
+    URL url = Loader.getResource(configurationOptionStr);
+    OptionConverter.selectAndConfigure(url, configuratorClassName, 
LogManager.getLoggerRepository());
+  }
 }
diff --git 
a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java 
b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
index 997ad78a..72023953 100644
--- a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
+++ b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
@@ -19,12 +19,61 @@ package org.apache.tez.dag.app;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.input.RssOrderedGroupedKVInput;
+import org.apache.tez.runtime.library.input.RssUnorderedKVInput;
+import org.apache.tez.runtime.library.output.RssOrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.RssUnorderedKVOutput;
+import org.apache.tez.runtime.library.output.RssUnorderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class RssDAGAppMasterTest {
 
@@ -57,4 +106,174 @@ public class RssDAGAppMasterTest {
       assertEquals(originalResources.get(i), newResources.get(i));
     }
   }
+
+  @Test
+  public void testRenameRssIOClassName() throws Exception {
+    // 1 Init and mock some basic module
+    AppContext appContext = mock(AppContext.class);
+    ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    HadoopShim defaultShim = new DefaultHadoopShim();
+    when(appContext.getHadoopShim()).thenReturn(defaultShim);
+    
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
+    ClusterInfo clusterInfo = new ClusterInfo();
+    
clusterInfo.setMaxContainerCapability(Resource.newInstance(Integer.MAX_VALUE, 
Integer.MAX_VALUE));
+    when(appContext.getClusterInfo()).thenReturn(clusterInfo);
+    HistoryEventHandler historyEventHandler = mock(HistoryEventHandler.class);
+    doReturn(historyEventHandler).when(appContext).getHistoryHandler();
+    ACLManager aclManager = new ACLManager("amUser");
+    doReturn(aclManager).when(appContext).getAMACLManager();
+
+    // 2 init dispatcher
+    AsyncDispatcher dispatcher = new AsyncDispatcher("core");
+
+    // 3 init dag
+    Configuration conf = new Configuration();
+    DAG dag = createDAG("test", conf);
+    TezDAGID dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
+    DAGProtos.DAGPlan dagPlan = dag.createDag(conf, null, null, null, false, 
null, null);
+    DAGImpl dagImpl = new DAGImpl(dagId, conf, dagPlan, 
dispatcher.getEventHandler(), null, new Credentials(),
+        new SystemClock(), "user", null, appContext);
+    when(appContext.getCurrentDAG()).thenReturn(dagImpl);
+
+    // 4 register call back function
+    RssDAGAppMaster.registerStateEnteredCallback(dagImpl);
+
+    // 5 register DAGEvent, init and start dispatcher
+    EventHandler<DAGEvent> dagEventDispatcher = new EventHandler<DAGEvent>() {
+      @Override
+      public void handle(DAGEvent event) {
+        dagImpl.handle(event);
+      }
+    };
+    dispatcher.register(DAGEventType.class, dagEventDispatcher);
+    dispatcher.init(conf);
+    dispatcher.start();
+
+    // 6 send DAG_INIT to dispatcher
+    dispatcher.getEventHandler().handle(new DAGEvent(dagImpl.getID(), 
DAGEventType.DAG_INIT));
+
+    // 7 wait DAGImpl transient to INITED state
+    await().atMost(2, TimeUnit.SECONDS).until(() -> 
dagImpl.getState().equals(DAGState.INITED));
+
+    // 8 verify I/O for vertexImpl
+    verfiyOutput(dagImpl, "vertex1", 
RssOrderedPartitionedKVOutput.class.getName());
+    verfiyInput(dagImpl, "vertex2", RssOrderedGroupedKVInput.class.getName());
+    verfiyOutput(dagImpl, "vertex2", RssUnorderedKVOutput.class.getName());
+    verfiyInput(dagImpl, "vertex3", RssUnorderedKVInput.class.getName());
+    verfiyOutput(dagImpl, "vertex3", 
RssUnorderedPartitionedKVOutput.class.getName());
+    verfiyInput(dagImpl, "vertex4", RssUnorderedKVInput.class.getName());
+  }
+
+  public static void verfiyInput(DAGImpl dag, String name, String 
expectedInputClassName) throws AMUserCodeException {
+    List<InputSpec> inputSpecs = dag.getVertex(name).getInputSpecList(0);
+    Assertions.assertEquals(1, inputSpecs.size());
+    Assertions.assertEquals(expectedInputClassName, 
inputSpecs.get(0).getInputDescriptor().getClassName());
+  }
+
+  public static void verfiyOutput(DAGImpl dag, String name, String 
expectedOutputClassName) throws AMUserCodeException {
+    List<OutputSpec> outputSpecs = dag.getVertex(name).getOutputSpecList(0);
+    Assertions.assertEquals(1, outputSpecs.size());
+    Assertions.assertEquals(expectedOutputClassName, 
outputSpecs.get(0).getOutputDescriptor().getClassName());
+  }
+
+  private static DAG createDAG(String dageName, Configuration conf) {
+    DataSourceDescriptor dummyInput = DataSourceDescriptor.create(
+        InputDescriptor.create("dummyclass"), 
InputInitializerDescriptor.create(""), null);
+
+    EdgeManagerPluginDescriptor cpEdgeManager =
+        
EdgeManagerPluginDescriptor.create(DummyProductEdgeManager.class.getName());
+
+    Vertex vertex1 = Vertex.create("vertex1", 
ProcessorDescriptor.create(DummyOp.class.getName()));
+    Vertex vertex2 = Vertex.create("vertex2", 
ProcessorDescriptor.create(DummyOp.class.getName()));
+    Vertex vertex3 = Vertex.create("vertex3", 
ProcessorDescriptor.create(DummyOp.class.getName()));
+    Vertex vertex4 = Vertex.create("vertex4", 
ProcessorDescriptor.create(DummyOp.class.getName()));
+
+    vertex1.addDataSource("dummyInput", dummyInput);
+    OrderedPartitionedKVEdgeConfig edgeConf12 =
+        
OrderedPartitionedKVEdgeConfig.newBuilder(NullWritable.class.getName(), 
NullWritable.class.getName(),
+            
HashPartitioner.class.getName()).setFromConfiguration(conf).build();
+    UnorderedKVEdgeConfig edgeConf23 =
+        UnorderedKVEdgeConfig.newBuilder(NullWritable.class.getName(), 
NullWritable.class.getName()).build();
+    UnorderedPartitionedKVEdgeConfig edgeConf34 =
+        
UnorderedPartitionedKVEdgeConfig.newBuilder(NullWritable.class.getName(), 
NullWritable.class.getName(),
+            
HashPartitioner.class.getName()).setFromConfiguration(conf).build();
+
+    DAG dag = DAG.create(dageName);
+    dag.addVertex(vertex1)
+        .addVertex(vertex2)
+        .addVertex(vertex3)
+        .addVertex(vertex4)
+        .addEdge(Edge.create(vertex1, vertex2, 
edgeConf12.createDefaultCustomEdgeProperty(cpEdgeManager)))
+        .addEdge(Edge.create(vertex2, vertex3, 
edgeConf23.createDefaultCustomEdgeProperty(cpEdgeManager)))
+        .addEdge(Edge.create(vertex3, vertex4, 
edgeConf34.createDefaultCustomEdgeProperty(cpEdgeManager)));
+    return dag;
+  }
+
+  public static class DummyOp extends SimpleProcessor {
+
+    public DummyOp(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() {
+    }
+  }
+
+  public static class DummyProductEdgeManager extends 
EdgeManagerPluginOnDemand {
+
+    public DummyProductEdgeManager(EdgeManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+    }
+
+    @Override
+    public void prepareForRouting() throws Exception {
+    }
+
+    @Override
+    public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) 
throws Exception {
+      return 1;
+    }
+
+    @Override
+    public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws 
Exception {
+      return 1;
+    }
+
+    @Override
+    public int getNumDestinationConsumerTasks(int sourceTaskIndex) throws 
Exception {
+      return 1;
+    }
+
+    @Override
+    public int routeInputErrorEventToSource(int destinationTaskIndex, int 
destinationFailedInputIndex)
+        throws Exception {
+      return 1;
+    }
+
+    @Nullable
+    @Override
+    public EventRouteMetadata routeDataMovementEventToDestination(int 
sourceTaskIndex, int sourceOutputIndex,
+        int destinationTaskIndex) throws Exception {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public CompositeEventRouteMetadata 
routeCompositeDataMovementEventToDestination(int sourceTaskIndex,
+        int destinationTaskIndex) throws Exception {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int 
sourceTaskIndex, 
+        int destinationTaskIndex) throws Exception {
+      return null;
+    }
+  }
 }
diff --git a/client-tez/src/test/resources/log4j.properties 
b/client-tez/src/test/resources/log4j.properties
new file mode 100644
index 00000000..946ffcc2
--- /dev/null
+++ b/client-tez/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} 
(%F:%M(%L)) - %m%n

Reply via email to