Repository: asterixdb Updated Branches: refs/heads/master 92c0bac18 -> 34ee3335a
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 4ee34ca..4eb1732 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -100,6 +100,10 @@ public class CCNCFunctions { SHUTDOWN_REQUEST, SHUTDOWN_RESPONSE, + DISTRIBUTE_JOB, + DESTROY_JOB, + DISTRIBUTED_JOB_FAILURE, + STATE_DUMP_REQUEST, STATE_DUMP_RESPONSE, @@ -282,6 +286,31 @@ public class CCNCFunctions { } } + public static class ReportDistributedJobFailureFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + private final String nodeId; + + public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) { + this.jobId = jobId; + this.nodeId = nodeId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DISTRIBUTED_JOB_FAILURE; + } + + public JobId getJobId() { + return jobId; + } + + public String getNodeId() { + return nodeId; + } + } + public static class NotifyJobletCleanupFunction extends Function { private static final long serialVersionUID = 1L; @@ -670,6 +699,51 @@ public class CCNCFunctions { } } + public static class DistributeJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + private final byte[] acgBytes; + + public DistributeJobFunction(JobId jobId, byte[] acgBytes) { + this.jobId = jobId; + this.acgBytes = acgBytes; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DISTRIBUTE_JOB; + } + + public JobId getJobId() { + return jobId; + } + + public byte[] getacgBytes() { + return acgBytes; + } + } + + public static class DestroyJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + public DestroyJobFunction(JobId jobId) { + this.jobId = jobId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DESTROY_JOB; + } + + public JobId getJobId() { + return jobId; + } + } + public static class StartTasksFunction extends Function { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index ac6fc2c..83ef32b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -141,6 +141,13 @@ public class ClusterControllerRemoteProxy implements IClusterController { } @Override + public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception { + CCNCFunctions.ReportDistributedJobFailureFunction fn = + new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, nodeId); + ipcHandle.send(-1, fn, null); + } + + @Override public void getNodeControllerInfos() throws Exception { ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 0d59b8d..2a8464e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -84,6 +84,18 @@ public class NodeControllerRemoteProxy implements INodeController { } @Override + public void distributeJob(JobId jobId, byte[] planBytes) throws Exception { + CCNCFunctions.DistributeJobFunction fn = new CCNCFunctions.DistributeJobFunction(jobId, planBytes); + ipcHandle.send(-1, fn, null); + } + + @Override + public void destroyJob(JobId jobId) throws Exception { + CCNCFunctions.DestroyJobFunction fn = new CCNCFunctions.DestroyJobFunction(jobId); + ipcHandle.send(-1, fn, null); + } + + @Override public void dumpState(String stateDumpId) throws Exception { CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId); ipcHandle.send(-1, dsf, null); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index 93ccaa4..e6278be 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -26,6 +26,8 @@ import org.apache.hyracks.control.nc.work.AbortTasksWork; import org.apache.hyracks.control.nc.work.ApplicationMessageWork; import org.apache.hyracks.control.nc.work.CleanupJobletWork; import org.apache.hyracks.control.nc.work.DeployBinaryWork; +import org.apache.hyracks.control.nc.work.DestroyJobWork; +import org.apache.hyracks.control.nc.work.DistributeJobWork; import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork; import org.apache.hyracks.control.nc.work.StartTasksWork; import org.apache.hyracks.control.nc.work.StateDumpWork; @@ -99,6 +101,16 @@ final class NodeControllerIPCI implements IIPCI { ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId())); return; + case DISTRIBUTE_JOB: + CCNCFunctions.DistributeJobFunction djf = (CCNCFunctions.DistributeJobFunction) fn; + ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, djf.getJobId(), djf.getacgBytes())); + return; + + case DESTROY_JOB: + CCNCFunctions.DestroyJobFunction dsjf = (CCNCFunctions.DestroyJobFunction) fn; + ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, dsjf.getJobId())); + return; + case STATE_DUMP_REQUEST: final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn; ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId())); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 19f01c1..bf0ddb6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -46,6 +46,8 @@ import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; @@ -94,11 +96,11 @@ public class NodeControllerService implements IControllerService { private final IOManager ioManager; - private final IPCSystem ipc; + private IPCSystem ipc; - private final PartitionManager partitionManager; + private PartitionManager partitionManager; - private final NetworkManager netManager; + private NetworkManager netManager; private IDatasetPartitionManager datasetPartitionManager; @@ -155,18 +157,11 @@ public class NodeControllerService implements IControllerService { public NodeControllerService(NCConfig ncConfig) throws Exception { this.ncConfig = ncConfig; id = ncConfig.nodeId; - ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), - new NodeControllerIPCI(this), - new CCNCFunctions.SerializerDeserializer()); ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.ioDevices)); if (id == null) { throw new Exception("id not set"); } - partitionManager = new PartitionManager(this); - netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, - ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort, - FullFrameChannelInterfaceFactory.INSTANCE); lccm = new LifeCycleComponentManager(); workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. @@ -244,7 +239,13 @@ public class NodeControllerService implements IControllerService { @Override public void start() throws Exception { LOGGER.log(Level.INFO, "Starting NodeControllerService"); + ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), + new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer()); ipc.start(); + partitionManager = new PartitionManager(this); + netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, + ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort, + FullFrameChannelInterfaceFactory.INSTANCE); netManager.start(); startApplication(); @@ -365,8 +366,28 @@ public class NodeControllerService implements IControllerService { return jobletMap; } - public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() { - return activityClusterGraphMap; + public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException { + if (activityClusterGraphMap.get(jobId) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); + } + activityClusterGraphMap.put(jobId, acg); + } + + public void removeActivityClusterGraph(JobId jobId) throws HyracksException { + if (activityClusterGraphMap.get(jobId) == null) { + throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); + } + activityClusterGraphMap.remove(jobId); + } + + public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException { + if (activityClusterGraphMap.get(jobId) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); + } + } + + public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException { + return activityClusterGraphMap.get(jobId); } public NetworkManager getNetworkManager() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java new file mode 100644 index 0000000..55dd01e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.control.nc.work; + +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.nc.NodeControllerService; + +/** + * destroy a pre-distributed job + * + */ +public class DestroyJobWork extends AbstractWork { + + private final NodeControllerService ncs; + private final JobId jobId; + + public DestroyJobWork(NodeControllerService ncs, JobId jobId) { + this.ncs = ncs; + this.jobId = jobId; + } + + @Override + public void run() { + try { + ncs.removeActivityClusterGraph(jobId); + } catch (HyracksException e) { + try { + ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId()); + } catch (Exception e1) { + e1.printStackTrace(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java new file mode 100644 index 0000000..3a4f6ac --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.control.nc.work; + +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.common.deployment.DeploymentUtils; +import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.nc.NodeControllerService; + +/** + * pre-distribute a job that can be executed later + * + */ +public class DistributeJobWork extends AbstractWork { + + private final NodeControllerService ncs; + private final byte[] acgBytes; + private final JobId jobId; + + public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] acgBytes) { + this.ncs = ncs; + this.jobId = jobId; + this.acgBytes = acgBytes; + } + + @Override + public void run() { + try { + ncs.checkForDuplicateDistributedJob(jobId); + ActivityClusterGraph acg = + (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getApplicationContext()); + ncs.storeActivityClusterGraph(jobId, acg); + } catch (HyracksException e) { + try { + ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId()); + } catch (Exception e1) { + e1.printStackTrace(); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index 803f15a..6cd9fa2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -46,6 +46,7 @@ import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityCluster; @@ -186,17 +187,12 @@ public class StartTasksWork extends AbstractWork { Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); Joblet ji = jobletMap.get(jobId); if (ji == null) { - Map<JobId, ActivityClusterGraph> acgMap = ncs.getActivityClusterGraphMap(); - ActivityClusterGraph acg = acgMap.get(jobId); + ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId); if (acg == null) { if (acgBytes == null) { - throw new HyracksException("Joblet was not found. This job was most likely aborted."); + throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); } acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx); - if (flags.contains(JobFlag.STORE_JOB)) { - //TODO: Right now the map is append-only - acgMap.put(jobId, acg); - } } ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg); jobletMap.put(jobId, ji); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java index b51a578..a7677f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java @@ -63,10 +63,10 @@ public abstract class AbstractIntegrationTest { public static final String NC1_ID = "nc1"; public static final String NC2_ID = "nc2"; - private static ClusterControllerService cc; + protected static ClusterControllerService cc; protected static NodeControllerService nc1; protected static NodeControllerService nc2; - private static IHyracksClientConnection hcc; + protected static IHyracksClientConnection hcc; private final List<File> outputFiles; private static AtomicInteger aInteger = new AtomicInteger(0); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java index efbb9d2..160336a 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java @@ -114,6 +114,11 @@ public class HeapSortMergeTest extends AbstractIntegrationTest { @Test public void optimizedSortMergeTest02() throws Exception { + JobSpecification spec = createSortMergeJobSpec(); + runTest(spec); + } + + public static JobSpecification createSortMergeJobSpec() throws Exception { JobSpecification spec = new JobSpecification(); FileSplit[] ordersSplits = new FileSplit[] { @@ -156,19 +161,17 @@ public class HeapSortMergeTest extends AbstractIntegrationTest { spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0); - spec.connect( - new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] { - 1, 0 }, new IBinaryHashFunctionFactory[] { - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 }, - new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0); + spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory( + new int[] { 1, 0 }, + new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), + new int[] { 1, 0 }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0); spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0); - - runTest(spec); + return spec; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java new file mode 100644 index 0000000..2509515 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.tests.integration; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; + +import java.io.File; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +public class PredistributedJobsTest { + private static final Logger LOGGER = Logger.getLogger(PredistributedJobsTest.class.getName()); + + private static final String NC1_ID = "nc1"; + private static final String NC2_ID = "nc2"; + + private static ClusterControllerService cc; + private static NodeControllerService nc1; + private static NodeControllerService nc2; + private static IHyracksClientConnection hcc; + + @BeforeClass + public static void init() throws Exception { + CCConfig ccConfig = new CCConfig(); + ccConfig.clientNetIpAddress = "127.0.0.1"; + ccConfig.clientNetPort = 39000; + ccConfig.clusterNetIpAddress = "127.0.0.1"; + ccConfig.clusterNetPort = 39001; + ccConfig.profileDumpPeriod = 10000; + FileUtils.deleteQuietly(new File("target" + File.separator + "data")); + FileUtils.copyDirectory(new File("data"), new File("target" + File.separator + "data")); + File outDir = new File("target" + File.separator + "ClusterController"); + outDir.mkdirs(); + File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir); + ccRoot.delete(); + ccRoot.mkdir(); + ccConfig.ccRoot = ccRoot.getAbsolutePath(); + ClusterControllerService ccBase = new ClusterControllerService(ccConfig); + cc = Mockito.spy(ccBase); + cc.start(); + + NCConfig ncConfig1 = new NCConfig(); + ncConfig1.ccHost = "localhost"; + ncConfig1.ccPort = 39001; + ncConfig1.clusterNetIPAddress = "127.0.0.1"; + ncConfig1.dataIPAddress = "127.0.0.1"; + ncConfig1.resultIPAddress = "127.0.0.1"; + ncConfig1.nodeId = NC1_ID; + ncConfig1.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data" + + File.separator + "device0"; + NodeControllerService nc1Base = new NodeControllerService(ncConfig1); + nc1 = Mockito.spy(nc1Base); + nc1.start(); + + NCConfig ncConfig2 = new NCConfig(); + ncConfig2.ccHost = "localhost"; + ncConfig2.ccPort = 39001; + ncConfig2.clusterNetIPAddress = "127.0.0.1"; + ncConfig2.dataIPAddress = "127.0.0.1"; + ncConfig2.resultIPAddress = "127.0.0.1"; + ncConfig2.nodeId = NC2_ID; + ncConfig2.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data" + + File.separator + "device1"; + NodeControllerService nc2Base = new NodeControllerService(ncConfig2); + nc2 = Mockito.spy(nc2Base); + nc2.start(); + + hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath()); + } + } + + @Test + public void DistributedTest() throws Exception { + JobSpecification spec1 = UnionTest.createUnionJobSpec(); + JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec(); + + //distribute both jobs + JobId jobId1 = hcc.distributeJob(spec1); + JobId jobId2 = hcc.distributeJob(spec2); + + //make sure it finished + //cc will get the store once to check for duplicate insertion and once to insert per job + verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore(); + verify(nc1, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any()); + verify(nc2, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any()); + verify(nc1, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any()); + verify(nc2, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any()); + + //confirm that both jobs are distributed + Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && nc2.getActivityClusterGraph(jobId1) != null); + Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && nc2.getActivityClusterGraph(jobId2) != null); + Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1) != null); + Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2) != null); + + //run the first job + hcc.startJob(jobId1); + hcc.waitForCompletion(jobId1); + + //destroy the first job + hcc.destroyJob(jobId1); + + //make sure it finished + verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore(); + verify(nc1, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any()); + verify(nc2, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any()); + + //confirm the first job is destroyed + Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && nc2.getActivityClusterGraph(jobId1) == null); + cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1); + + //run the second job + hcc.startJob(jobId2); + hcc.waitForCompletion(jobId2); + + //run the second job again + hcc.startJob(jobId2); + hcc.waitForCompletion(jobId2); + + //destroy the second job + hcc.destroyJob(jobId2); + + //make sure it finished + verify(cc, Mockito.timeout(5000).times(12)).getPreDistributedJobStore(); + verify(nc1, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any()); + verify(nc2, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any()); + + //confirm the second job is destroyed + Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && nc2.getActivityClusterGraph(jobId2) == null); + cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2); + } + + @AfterClass + public static void deinit() throws Exception { + nc2.stop(); + nc1.stop(); + cc.stop(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java index 02cab8f..542f037 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java @@ -44,6 +44,11 @@ import org.junit.Test; public class UnionTest extends AbstractIntegrationTest { @Test public void union01() throws Exception { + JobSpecification spec = createUnionJobSpec(); + runTest(spec); + } + + public static JobSpecification createUnionJobSpec() throws Exception { JobSpecification spec = new JobSpecification(); IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { @@ -82,6 +87,6 @@ public class UnionTest extends AbstractIntegrationTest { spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0); spec.addRoot(printer); - runTest(spec); + return spec; } }
