This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b9e99ead [#965] feat(tez): support remote shuffle for tez framework
(#966)
b9e99ead is described below
commit b9e99ead382be19b115f658ddd99b923554f6ad1
Author: zhengchenyu <[email protected]>
AuthorDate: Thu Jun 29 17:06:49 2023 +0800
[#965] feat(tez): support remote shuffle for tez framework (#966)
### What changes were proposed in this pull request?
A method to support remote shuffle for tez framework.
### How was this patch tested?
test in real cluster and integration test.
---
.../java/org/apache/tez/common/RssTezUtils.java | 68 +++++++
.../org/apache/tez/dag/app/RssDAGAppMaster.java | 112 +++++++++--
.../apache/tez/dag/app/RssDAGAppMasterTest.java | 219 +++++++++++++++++++++
client-tez/src/test/resources/log4j.properties | 22 +++
4 files changed, 407 insertions(+), 14 deletions(-)
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index ed4df3e8..c980ddb8 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -27,6 +27,24 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
+import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
+import org.apache.tez.runtime.library.input.RssConcatenatedMergedKeyValueInput;
+import
org.apache.tez.runtime.library.input.RssConcatenatedMergedKeyValuesInput;
+import org.apache.tez.runtime.library.input.RssOrderedGroupedInputLegacy;
+import org.apache.tez.runtime.library.input.RssOrderedGroupedKVInput;
+import org.apache.tez.runtime.library.input.RssOrderedGroupedMergedKVInput;
+import org.apache.tez.runtime.library.input.RssUnorderedKVInput;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.RssOrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.RssUnorderedKVOutput;
+import org.apache.tez.runtime.library.output.RssUnorderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -325,4 +343,54 @@ public class RssTezUtils {
}
}
}
+
+ public static String replaceRssOutputClassName(String className) {
+ if (className.equals(OrderedPartitionedKVOutput.class.getName())) {
+ LOG.info("Output class name will transient from {} to {}", className,
+ RssOrderedPartitionedKVOutput.class.getName());
+ return RssOrderedPartitionedKVOutput.class.getName();
+ } else if (className.equals(UnorderedKVOutput.class.getName())) {
+ LOG.info("Output class name will transient from {} to {}", className,
+ RssUnorderedKVOutput.class.getName());
+ return RssUnorderedKVOutput.class.getName();
+ } else if (className.equals(UnorderedPartitionedKVOutput.class.getName()))
{
+ LOG.info("Output class name will transient from {} to {}", className,
+ RssUnorderedPartitionedKVOutput.class.getName());
+ return RssUnorderedPartitionedKVOutput.class.getName();
+ } else {
+ LOG.info("Unexpected kv output class name {}.", className);
+ return className;
+ }
+ }
+
+ public static String replaceRssInputClassName(String className) {
+ if (className.equals(OrderedGroupedKVInput.class.getName())) {
+ LOG.info("Input class name will transient from {} to {}", className,
+ RssOrderedGroupedKVInput.class.getName());
+ return RssOrderedGroupedKVInput.class.getName();
+ } else if (className.equals(OrderedGroupedMergedKVInput.class.getName())) {
+ LOG.info("Input class name will transient from {} to {}", className,
+ RssOrderedGroupedMergedKVInput.class.getName());
+ return RssOrderedGroupedMergedKVInput.class.getName();
+ } else if (className.equals(OrderedGroupedInputLegacy.class.getName())) {
+ LOG.info("Input class name will transient from {} to {}", className,
+ RssOrderedGroupedInputLegacy.class.getName());
+ return RssOrderedGroupedInputLegacy.class.getName();
+ } else if (className.equals(UnorderedKVInput.class.getName())) {
+ LOG.info("Input class name will transient from {} to {}", className,
+ RssUnorderedKVInput.class.getName());
+ return RssUnorderedKVInput.class.getName();
+ } else if
(className.equals(ConcatenatedMergedKeyValueInput.class.getName())) {
+ LOG.info("Input class name will transient from {} to {}", className,
+ RssConcatenatedMergedKeyValueInput.class.getName());
+ return RssConcatenatedMergedKeyValueInput.class.getName();
+ } else if
(className.equals(ConcatenatedMergedKeyValuesInput.class.getName())) {
+ LOG.info("Input class name will transient from {} to {}", className,
+ RssConcatenatedMergedKeyValuesInput.class.getName());
+ return RssConcatenatedMergedKeyValuesInput.class.getName();
+ } else {
+ LOG.info("Unexpected kv input class name {}.", className);
+ return className;
+ }
+ }
}
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 16086897..cb6146bf 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app;
import java.lang.reflect.Field;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -26,9 +27,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Options;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -48,18 +47,30 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.helpers.Loader;
+import org.apache.log4j.helpers.OptionConverter;
import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.common.TezClassLoader;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.VersionInfo;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +78,8 @@ import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ThreadUtils;
+import static org.apache.log4j.LogManager.CONFIGURATOR_CLASS_KEY;
+import static org.apache.log4j.LogManager.DEFAULT_CONFIGURATION_KEY;
import static org.apache.tez.common.TezCommonUtils.TEZ_SYSTEM_SUB_DIR;
public class RssDAGAppMaster extends DAGAppMaster {
@@ -158,6 +171,13 @@ public class RssDAGAppMaster extends DAGAppMaster {
mayCloseTezSlowStart(conf);
}
+ @Override
+ protected DAG createDAG(DAGProtos.DAGPlan dagPB) {
+ DAGImpl dag = createDAG(dagPB, null);
+ registerStateEnteredCallback(dag);
+ return dag;
+ }
+
@Override
public String submitDAGToAppMaster(DAGProtos.DAGPlan dagPlan, Map<String,
LocalResource> additionalResources)
throws TezException {
@@ -206,6 +226,24 @@ public class RssDAGAppMaster extends DAGAppMaster {
*/
public static void main(String[] args) {
try {
+ // We use trick way to introduce RssDAGAppMaster by the config
tez.am.launch.cmd-opts.
+ // It means some property which is set by command line will be ingored,
so we must reload it.
+ boolean sessionModeCliOption = false;
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].startsWith("-D")) {
+ String[] property = args[i].split("=");
+ if (property.length < 2) {
+ System.setProperty(property[0].substring(2), "");
+ } else {
+ System.setProperty(property[0].substring(2), property[1]);
+ }
+ } else if (args[i].contains("--session") || args[i].contains("-s")) {
+ sessionModeCliOption = true;
+ }
+ }
+ // Load the log4j config is only init in static code block of
LogManager, so we must reconfigure.
+ reconfigureLog4j();
+
// Install the tez class loader, which can be used add new resources
TezClassLoader.setupTezClassLoader();
Thread.setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
@@ -228,15 +266,7 @@ public class RssDAGAppMaster extends DAGAppMaster {
String jobUserName = System
.getenv(ApplicationConstants.Environment.USER.name());
- // Command line options
- Options opts = new Options();
- opts.addOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION,
- false, "Run Tez Application Master in Session mode");
-
- CommandLine cliParser = new GnuParser().parse(opts, args);
- boolean sessionModeCliOption =
cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
-
- LOG.info("Creating DAGAppMaster for "
+ LOG.info("Creating RssDAGAppMaster for "
+ "applicationId=" + applicationAttemptId.getApplicationId()
+ ", attemptNum=" + applicationAttemptId.getAttemptId()
+ ", AMContainerId=" + containerId
@@ -294,7 +324,7 @@ public class RssDAGAppMaster extends DAGAppMaster {
initAndStartRSSClient(appMaster, conf, applicationAttemptId);
initAndStartAppMaster(appMaster, conf);
} catch (Throwable t) {
- LOG.error("Error starting DAGAppMaster", t);
+ LOG.error("Error starting RssDAGAppMaster", t);
System.exit(1);
}
}
@@ -358,8 +388,62 @@ public class RssDAGAppMaster extends DAGAppMaster {
}
}
- RssDAGAppMaster.LOG.info("MRAppMaster received a signal. Signaling
RMCommunicator and JobHistoryEventHandler.");
+ RssDAGAppMaster.LOG.info(
+ "RssDAGAppMaster received a signal. Signaling RMCommunicator and
JobHistoryEventHandler.");
this.appMaster.stop();
}
}
+
+ @VisibleForTesting
+ public static void registerStateEnteredCallback(DAGImpl dag) {
+ StateMachineTez
+ stateMachine = (StateMachineTez) getPrivateField(dag, "stateMachine");
+ stateMachine.registerStateEnteredCallback(DAGState.INITED, new
DagInitialCallback());
+ }
+
+ static class DagInitialCallback implements OnStateChangedCallback<DAGState,
DAGImpl> {
+
+ @Override
+ public void onStateChanged(DAGImpl dag, DAGState dagState) {
+ try {
+ Map<String, Edge> edges = (Map<String, Edge>) getPrivateField(dag,
"edges");
+ for (Map.Entry<String, Edge> entry : edges.entrySet()) {
+ Edge edge = entry.getValue();
+
+ OutputDescriptor outputDescriptor =
edge.getEdgeProperty().getEdgeSource();
+ Field outputClassNameField =
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+ outputClassNameField.setAccessible(true);
+ String outputClassName = (String)
outputClassNameField.get(outputDescriptor);
+ String rssOutputClassName =
RssTezUtils.replaceRssOutputClassName(outputClassName);
+ outputClassNameField.set(outputDescriptor, rssOutputClassName);
+
+ InputDescriptor inputDescriptor =
edge.getEdgeProperty().getEdgeDestination();
+ Field inputClassNameField =
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+ inputClassNameField.setAccessible(true);
+ String inputClassName = (String)
outputClassNameField.get(inputDescriptor);
+ String rssInputClassName =
RssTezUtils.replaceRssInputClassName(inputClassName);
+ outputClassNameField.set(inputDescriptor, rssInputClassName);
+ }
+ } catch (Exception e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+ }
+
+ private static Object getPrivateField(Object object, String name) {
+ try {
+ Field f = object.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(object);
+ } catch (Exception e) {
+ throw new RssException(e);
+ }
+ }
+
+ private static void reconfigureLog4j() {
+ String configuratorClassName =
OptionConverter.getSystemProperty(CONFIGURATOR_CLASS_KEY, null);
+ String configurationOptionStr =
OptionConverter.getSystemProperty(DEFAULT_CONFIGURATION_KEY, null);
+ URL url = Loader.getResource(configurationOptionStr);
+ OptionConverter.selectAndConfigure(url, configuratorClassName,
LogManager.getLoggerRepository());
+ }
}
diff --git
a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
index 997ad78a..72023953 100644
--- a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
+++ b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
@@ -19,12 +19,61 @@ package org.apache.tez.dag.app;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.input.RssOrderedGroupedKVInput;
+import org.apache.tez.runtime.library.input.RssUnorderedKVInput;
+import org.apache.tez.runtime.library.output.RssOrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.RssUnorderedKVOutput;
+import org.apache.tez.runtime.library.output.RssUnorderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class RssDAGAppMasterTest {
@@ -57,4 +106,174 @@ public class RssDAGAppMasterTest {
assertEquals(originalResources.get(i), newResources.get(i));
}
}
+
+ @Test
+ public void testRenameRssIOClassName() throws Exception {
+ // 1 Init and mock some basic module
+ AppContext appContext = mock(AppContext.class);
+ ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+ HadoopShim defaultShim = new DefaultHadoopShim();
+ when(appContext.getHadoopShim()).thenReturn(defaultShim);
+
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
+ ClusterInfo clusterInfo = new ClusterInfo();
+
clusterInfo.setMaxContainerCapability(Resource.newInstance(Integer.MAX_VALUE,
Integer.MAX_VALUE));
+ when(appContext.getClusterInfo()).thenReturn(clusterInfo);
+ HistoryEventHandler historyEventHandler = mock(HistoryEventHandler.class);
+ doReturn(historyEventHandler).when(appContext).getHistoryHandler();
+ ACLManager aclManager = new ACLManager("amUser");
+ doReturn(aclManager).when(appContext).getAMACLManager();
+
+ // 2 init dispatcher
+ AsyncDispatcher dispatcher = new AsyncDispatcher("core");
+
+ // 3 init dag
+ Configuration conf = new Configuration();
+ DAG dag = createDAG("test", conf);
+ TezDAGID dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
+ DAGProtos.DAGPlan dagPlan = dag.createDag(conf, null, null, null, false,
null, null);
+ DAGImpl dagImpl = new DAGImpl(dagId, conf, dagPlan,
dispatcher.getEventHandler(), null, new Credentials(),
+ new SystemClock(), "user", null, appContext);
+ when(appContext.getCurrentDAG()).thenReturn(dagImpl);
+
+ // 4 register call back function
+ RssDAGAppMaster.registerStateEnteredCallback(dagImpl);
+
+ // 5 register DAGEvent, init and start dispatcher
+ EventHandler<DAGEvent> dagEventDispatcher = new EventHandler<DAGEvent>() {
+ @Override
+ public void handle(DAGEvent event) {
+ dagImpl.handle(event);
+ }
+ };
+ dispatcher.register(DAGEventType.class, dagEventDispatcher);
+ dispatcher.init(conf);
+ dispatcher.start();
+
+ // 6 send DAG_INIT to dispatcher
+ dispatcher.getEventHandler().handle(new DAGEvent(dagImpl.getID(),
DAGEventType.DAG_INIT));
+
+ // 7 wait DAGImpl transient to INITED state
+ await().atMost(2, TimeUnit.SECONDS).until(() ->
dagImpl.getState().equals(DAGState.INITED));
+
+ // 8 verify I/O for vertexImpl
+ verfiyOutput(dagImpl, "vertex1",
RssOrderedPartitionedKVOutput.class.getName());
+ verfiyInput(dagImpl, "vertex2", RssOrderedGroupedKVInput.class.getName());
+ verfiyOutput(dagImpl, "vertex2", RssUnorderedKVOutput.class.getName());
+ verfiyInput(dagImpl, "vertex3", RssUnorderedKVInput.class.getName());
+ verfiyOutput(dagImpl, "vertex3",
RssUnorderedPartitionedKVOutput.class.getName());
+ verfiyInput(dagImpl, "vertex4", RssUnorderedKVInput.class.getName());
+ }
+
+ public static void verfiyInput(DAGImpl dag, String name, String
expectedInputClassName) throws AMUserCodeException {
+ List<InputSpec> inputSpecs = dag.getVertex(name).getInputSpecList(0);
+ Assertions.assertEquals(1, inputSpecs.size());
+ Assertions.assertEquals(expectedInputClassName,
inputSpecs.get(0).getInputDescriptor().getClassName());
+ }
+
+ public static void verfiyOutput(DAGImpl dag, String name, String
expectedOutputClassName) throws AMUserCodeException {
+ List<OutputSpec> outputSpecs = dag.getVertex(name).getOutputSpecList(0);
+ Assertions.assertEquals(1, outputSpecs.size());
+ Assertions.assertEquals(expectedOutputClassName,
outputSpecs.get(0).getOutputDescriptor().getClassName());
+ }
+
+ private static DAG createDAG(String dageName, Configuration conf) {
+ DataSourceDescriptor dummyInput = DataSourceDescriptor.create(
+ InputDescriptor.create("dummyclass"),
InputInitializerDescriptor.create(""), null);
+
+ EdgeManagerPluginDescriptor cpEdgeManager =
+
EdgeManagerPluginDescriptor.create(DummyProductEdgeManager.class.getName());
+
+ Vertex vertex1 = Vertex.create("vertex1",
ProcessorDescriptor.create(DummyOp.class.getName()));
+ Vertex vertex2 = Vertex.create("vertex2",
ProcessorDescriptor.create(DummyOp.class.getName()));
+ Vertex vertex3 = Vertex.create("vertex3",
ProcessorDescriptor.create(DummyOp.class.getName()));
+ Vertex vertex4 = Vertex.create("vertex4",
ProcessorDescriptor.create(DummyOp.class.getName()));
+
+ vertex1.addDataSource("dummyInput", dummyInput);
+ OrderedPartitionedKVEdgeConfig edgeConf12 =
+
OrderedPartitionedKVEdgeConfig.newBuilder(NullWritable.class.getName(),
NullWritable.class.getName(),
+
HashPartitioner.class.getName()).setFromConfiguration(conf).build();
+ UnorderedKVEdgeConfig edgeConf23 =
+ UnorderedKVEdgeConfig.newBuilder(NullWritable.class.getName(),
NullWritable.class.getName()).build();
+ UnorderedPartitionedKVEdgeConfig edgeConf34 =
+
UnorderedPartitionedKVEdgeConfig.newBuilder(NullWritable.class.getName(),
NullWritable.class.getName(),
+
HashPartitioner.class.getName()).setFromConfiguration(conf).build();
+
+ DAG dag = DAG.create(dageName);
+ dag.addVertex(vertex1)
+ .addVertex(vertex2)
+ .addVertex(vertex3)
+ .addVertex(vertex4)
+ .addEdge(Edge.create(vertex1, vertex2,
edgeConf12.createDefaultCustomEdgeProperty(cpEdgeManager)))
+ .addEdge(Edge.create(vertex2, vertex3,
edgeConf23.createDefaultCustomEdgeProperty(cpEdgeManager)))
+ .addEdge(Edge.create(vertex3, vertex4,
edgeConf34.createDefaultCustomEdgeProperty(cpEdgeManager)));
+ return dag;
+ }
+
+ public static class DummyOp extends SimpleProcessor {
+
+ public DummyOp(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() {
+ }
+ }
+
+ public static class DummyProductEdgeManager extends
EdgeManagerPluginOnDemand {
+
+ public DummyProductEdgeManager(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ }
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex)
throws Exception {
+ return 1;
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws
Exception {
+ return 1;
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) throws
Exception {
+ return 1;
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex, int
destinationFailedInputIndex)
+ throws Exception {
+ return 1;
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int
sourceTaskIndex, int sourceOutputIndex,
+ int destinationTaskIndex) throws Exception {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public CompositeEventRouteMetadata
routeCompositeDataMovementEventToDestination(int sourceTaskIndex,
+ int destinationTaskIndex) throws Exception {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int
sourceTaskIndex,
+ int destinationTaskIndex) throws Exception {
+ return null;
+ }
+ }
}
diff --git a/client-tez/src/test/resources/log4j.properties
b/client-tez/src/test/resources/log4j.properties
new file mode 100644
index 00000000..946ffcc2
--- /dev/null
+++ b/client-tez/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}
(%F:%M(%L)) - %m%n