Repository: asterixdb Updated Branches: refs/heads/master 51e381277 -> da7e8a16d
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java index 7c85d5a..83585d1 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java @@ -20,9 +20,9 @@ package org.apache.hyracks.examples.shutdown.test; import java.net.ServerSocket; -import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.ipc.exceptions.IPCException; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml index f412499..c00ffc1 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml @@ -62,6 +62,11 @@ <artifactId>hyracks-data-std</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-ipc</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java index 23a6be0..2004be9 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java @@ -21,7 +21,6 @@ package org.apache.hyracks.examples.text.client; import java.io.File; import java.util.EnumSet; -import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; @@ -63,6 +62,7 @@ import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOpera import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor; import org.apache.hyracks.examples.text.WordTupleParserFactory; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml index e1e2006..31bcf2e 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml @@ -56,6 +56,11 @@ <artifactId>hyracks-api</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-ipc</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java index 80c4f88..cb9307b 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java @@ -23,7 +23,6 @@ import static org.apache.hyracks.examples.tpch.client.Common.lineitemDesc; import static org.apache.hyracks.examples.tpch.client.Common.lineitemParserFactories; import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits; -import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -58,6 +57,7 @@ import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFa import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory; import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor; import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java index c3d0df1..a0d40ee 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java @@ -25,7 +25,6 @@ import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits; import java.util.EnumSet; -import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; @@ -65,6 +64,7 @@ import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory; import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java index 5043974..08d1031 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java @@ -19,11 +19,13 @@ package org.apache.hyracks.examples.tpch.client; -import static org.apache.hyracks.examples.tpch.client.Common.*; +import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint; +import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories; +import static org.apache.hyracks.examples.tpch.client.Common.ordersDesc; +import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits; import java.util.EnumSet; -import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -50,6 +52,7 @@ import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.Algorithm; import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml index cba83e3..1a6422b 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml @@ -189,6 +189,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-ipc</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java index 9633fb1..bc187f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java @@ -33,7 +33,6 @@ import java.util.Random; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -41,6 +40,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.topology.ClusterTopology; import org.apache.hyracks.hdfs.api.INcCollection; import org.apache.hyracks.hdfs.api.INcCollectionBuilder; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java index 8f96bab..c2ba188 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java @@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -54,6 +53,7 @@ import org.apache.hyracks.hdfs.lib.TextKeyValueParserFactory; import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory; import org.apache.hyracks.hdfs.scheduler.Scheduler; import org.apache.hyracks.hdfs.utils.HyracksUtils; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.apache.hyracks.test.support.TestUtils; import org.apache.hyracks.util.file.FileUtil; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java index 1fddc46..17cd793 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java @@ -21,7 +21,7 @@ package org.apache.hyracks.hdfs.utils; import java.util.EnumSet; -import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java index 02c0a20..04fdc85 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.ipc.impl.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml index d014f3b..4db3418 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml @@ -54,5 +54,18 @@ <artifactId>hyracks-util</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java new file mode 100644 index 0000000..a61c96d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.impl; + +import java.io.Serializable; +import java.net.URL; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.ResultDirectoryRecord; +import org.apache.hyracks.api.result.ResultSetId; + +public class HyracksClientInterfaceFunctions { + public enum FunctionId { + GET_CLUSTER_CONTROLLER_INFO, + GET_CLUSTER_TOPOLOGY, + GET_JOB_STATUS, + GET_JOB_INFO, + START_JOB, + DEPLOY_JOB, + UNDEPLOY_JOB, + REDEPLOY_JOB, + CANCEL_JOB, + GET_RESULT_DIRECTORY_ADDRESS, + GET_RESULT_STATUS, + GET_RESULT_LOCATIONS, + WAIT_FOR_COMPLETION, + GET_NODE_CONTROLLERS_INFO, + CLI_DEPLOY_BINARY, + CLI_UNDEPLOY_BINARY, + CLUSTER_SHUTDOWN, + GET_NODE_DETAILS_JSON, + THREAD_DUMP + } + + public abstract static class Function implements Serializable { + private static final long serialVersionUID = 1L; + + public abstract FunctionId getFunctionId(); + } + + public static class GetClusterControllerInfoFunction extends Function { + private static final long serialVersionUID = 1L; + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_CLUSTER_CONTROLLER_INFO; + } + } + + public static class GetJobStatusFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + public GetJobStatusFunction(JobId jobId) { + this.jobId = jobId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_JOB_STATUS; + } + + public JobId getJobId() { + return jobId; + } + } + + public static class GetJobInfoFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + public GetJobInfoFunction(JobId jobId) { + this.jobId = jobId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_JOB_INFO; + } + + public JobId getJobId() { + return jobId; + } + } + + public static class redeployJobSpecFunction extends Function { + private static final long serialVersionUID = 1L; + + private final byte[] acggfBytes; + + private final DeployedJobSpecId deployedJobSpecId; + + public redeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) { + this.deployedJobSpecId = deployedJobSpecId; + this.acggfBytes = acggfBytes; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.REDEPLOY_JOB; + } + + public byte[] getACGGFBytes() { + return acggfBytes; + } + + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; + } + } + + public static class DeployJobSpecFunction extends Function { + private static final long serialVersionUID = 1L; + + private final byte[] acggfBytes; + + public DeployJobSpecFunction(byte[] acggfBytes) { + this.acggfBytes = acggfBytes; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DEPLOY_JOB; + } + + public byte[] getACGGFBytes() { + return acggfBytes; + } + } + + public static class CancelJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + public CancelJobFunction(JobId jobId) { + this.jobId = jobId; + if (jobId == null) { + throw new IllegalArgumentException("jobId"); + } + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.CANCEL_JOB; + } + + public JobId getJobId() { + return jobId; + } + } + + public static class UndeployJobSpecFunction extends Function { + private static final long serialVersionUID = 1L; + + private final DeployedJobSpecId deployedJobSpecId; + + public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) { + this.deployedJobSpecId = deployedJobSpecId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.UNDEPLOY_JOB; + } + + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; + } + } + + public static class StartJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final byte[] acggfBytes; + private final Set<JobFlag> jobFlags; + private final DeploymentId deploymentId; + private final DeployedJobSpecId deployedJobSpecId; + private final Map<byte[], byte[]> jobParameters; + + public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags, + DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) { + this.acggfBytes = acggfBytes; + this.jobFlags = jobFlags; + this.deploymentId = deploymentId; + this.deployedJobSpecId = deployedJobSpecId; + this.jobParameters = jobParameters; + } + + public StartJobFunction(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) { + this(null, null, EnumSet.noneOf(JobFlag.class), deployedJobSpecId, jobParameters); + } + + public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) { + this(null, acggfBytes, jobFlags, null, null); + } + + public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags) { + this(deploymentId, acggfBytes, jobFlags, null, null); + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.START_JOB; + } + + public Map<byte[], byte[]> getJobParameters() { + return jobParameters; + } + + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; + } + + public byte[] getACGGFBytes() { + return acggfBytes; + } + + public Set<JobFlag> getJobFlags() { + return jobFlags; + } + + public DeploymentId getDeploymentId() { + return deploymentId; + } + } + + public static class GetResultDirectoryAddressFunction extends Function { + private static final long serialVersionUID = 1L; + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_RESULT_DIRECTORY_ADDRESS; + } + } + + public static class GetResultStatusFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + private final ResultSetId rsId; + + public GetResultStatusFunction(JobId jobId, ResultSetId rsId) { + this.jobId = jobId; + this.rsId = rsId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_RESULT_STATUS; + } + + public JobId getJobId() { + return jobId; + } + + public ResultSetId getResultSetId() { + return rsId; + } + } + + public static class GetResultLocationsFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + private final ResultSetId rsId; + + private final ResultDirectoryRecord[] knownRecords; + + public GetResultLocationsFunction(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords) { + this.jobId = jobId; + this.rsId = rsId; + this.knownRecords = knownRecords; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_RESULT_LOCATIONS; + } + + public JobId getJobId() { + return jobId; + } + + public ResultSetId getResultSetId() { + return rsId; + } + + public ResultDirectoryRecord[] getKnownRecords() { + return knownRecords; + } + } + + public static class WaitForCompletionFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + public WaitForCompletionFunction(JobId jobId) { + this.jobId = jobId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.WAIT_FOR_COMPLETION; + } + + public JobId getJobId() { + return jobId; + } + } + + public static class GetNodeControllersInfoFunction extends Function { + private static final long serialVersionUID = 1L; + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_NODE_CONTROLLERS_INFO; + } + } + + public static class GetClusterTopologyFunction extends Function { + private static final long serialVersionUID = 1L; + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_CLUSTER_TOPOLOGY; + } + } + + public static class CliDeployBinaryFunction extends Function { + private static final long serialVersionUID = 1L; + private final List<URL> binaryURLs; + private final DeploymentId deploymentId; + + public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId) { + this.binaryURLs = binaryURLs; + this.deploymentId = deploymentId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.CLI_DEPLOY_BINARY; + } + + public List<URL> getBinaryURLs() { + return binaryURLs; + } + + public DeploymentId getDeploymentId() { + return deploymentId; + } + } + + public static class CliUnDeployBinaryFunction extends Function { + private static final long serialVersionUID = 1L; + private final DeploymentId deploymentId; + + public CliUnDeployBinaryFunction(DeploymentId deploymentId) { + this.deploymentId = deploymentId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.CLI_UNDEPLOY_BINARY; + } + + public DeploymentId getDeploymentId() { + return deploymentId; + } + } + + public static class ClusterShutdownFunction extends Function { + private static final long serialVersionUID = 1L; + private final boolean terminateNCService; + + public ClusterShutdownFunction(boolean terminateNCService) { + this.terminateNCService = terminateNCService; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.CLUSTER_SHUTDOWN; + } + + public boolean isTerminateNCService() { + return terminateNCService; + } + } + + public static class GetNodeDetailsJSONFunction extends Function { + private static final long serialVersionUID = 1L; + private final String nodeId; + private final boolean includeStats; + private final boolean includeConfig; + + public GetNodeDetailsJSONFunction(String nodeId, boolean includeStats, boolean includeConfig) { + this.nodeId = nodeId; + this.includeStats = includeStats; + this.includeConfig = includeConfig; + } + + public String getNodeId() { + return nodeId; + } + + public boolean isIncludeStats() { + return includeStats; + } + + public boolean isIncludeConfig() { + return includeConfig; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.GET_NODE_DETAILS_JSON; + } + } + + public static class ThreadDumpFunction extends Function { + private final String node; + + public ThreadDumpFunction(String node) { + this.node = node; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.THREAD_DUMP; + } + + public String getNode() { + return node; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java new file mode 100644 index 0000000..3fac7da --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.impl; + +import java.net.URL; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hyracks.api.client.IHyracksClientInterface; +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.client.impl.ClusterControllerInfo; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobInfo; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.topology.ClusterTopology; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.api.RPCInterface; +import org.apache.hyracks.ipc.exceptions.IPCException; + +public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface { + private static final int SHUTDOWN_CONNECTION_TIMEOUT_SECS = 30; + + private final IIPCHandle ipcHandle; + + private final RPCInterface rpci; + + public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) { + this.ipcHandle = ipcHandle; + this.rpci = rpci; + } + + @Override + public ClusterControllerInfo getClusterControllerInfo() throws Exception { + HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = + new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction(); + return (ClusterControllerInfo) rpci.call(ipcHandle, gccif); + } + + @Override + public JobStatus getJobStatus(JobId jobId) throws Exception { + HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = + new HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId); + return (JobStatus) rpci.call(ipcHandle, gjsf); + } + + @Override + public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception { + HyracksClientInterfaceFunctions.StartJobFunction sjf = + new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags); + return (JobId) rpci.call(ipcHandle, sjf); + } + + @Override + public void cancelJob(JobId jobId) throws Exception { + HyracksClientInterfaceFunctions.CancelJobFunction cjf = + new HyracksClientInterfaceFunctions.CancelJobFunction(jobId); + rpci.call(ipcHandle, cjf); + } + + @Override + public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception { + HyracksClientInterfaceFunctions.StartJobFunction sjf = + new HyracksClientInterfaceFunctions.StartJobFunction(deployedJobSpecId, jobParameters); + return (JobId) rpci.call(ipcHandle, sjf); + } + + @Override + public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception { + HyracksClientInterfaceFunctions.StartJobFunction sjf = + new HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes, jobFlags); + return (JobId) rpci.call(ipcHandle, sjf); + } + + @Override + public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception { + HyracksClientInterfaceFunctions.DeployJobSpecFunction sjf = + new HyracksClientInterfaceFunctions.DeployJobSpecFunction(acggfBytes); + return (DeployedJobSpecId) rpci.call(ipcHandle, sjf); + } + + @Override + public void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) throws Exception { + HyracksClientInterfaceFunctions.redeployJobSpecFunction udjsf = + new HyracksClientInterfaceFunctions.redeployJobSpecFunction(deployedJobSpecId, acggfBytes); + rpci.call(ipcHandle, udjsf); + } + + @Override + public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { + HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf = + new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId); + rpci.call(ipcHandle, sjf); + } + + @Override + public NetworkAddress getResultDirectoryAddress() throws Exception { + HyracksClientInterfaceFunctions.GetResultDirectoryAddressFunction gddsf = + new HyracksClientInterfaceFunctions.GetResultDirectoryAddressFunction(); + return (NetworkAddress) rpci.call(ipcHandle, gddsf); + } + + @Override + public void waitForCompletion(JobId jobId) throws Exception { + HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = + new HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId); + rpci.call(ipcHandle, wfcf); + } + + @Override + public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception { + HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = + new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction(); + return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif); + } + + @Override + public ClusterTopology getClusterTopology() throws Exception { + HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = + new HyracksClientInterfaceFunctions.GetClusterTopologyFunction(); + return (ClusterTopology) rpci.call(ipcHandle, gctf); + } + + @Override + public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception { + HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = + new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId); + rpci.call(ipcHandle, dbf); + } + + @Override + public void unDeployBinary(DeploymentId deploymentId) throws Exception { + HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf = + new HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId); + rpci.call(ipcHandle, dbf); + } + + @Override + public JobInfo getJobInfo(JobId jobId) throws Exception { + HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = + new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId); + return (JobInfo) rpci.call(ipcHandle, gjsf); + } + + @Override + public void stopCluster(boolean terminateNCService) throws Exception { + HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf = + new HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService); + rpci.call(ipcHandle, csdf); + int i = 0; + // give the CC some time to do final settling after it returns our request + while (ipcHandle.isConnected() && i++ < SHUTDOWN_CONNECTION_TIMEOUT_SECS) { + synchronized (this) { + wait(TimeUnit.SECONDS.toMillis(1)); + } + } + if (ipcHandle.isConnected()) { + throw new IPCException( + "CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds"); + } + } + + @Override + public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception { + HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gjsf = + new HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction(nodeId, includeStats, includeConfig); + return (String) rpci.call(ipcHandle, gjsf); + } + + @Override + public String getThreadDump(String node) throws Exception { + HyracksClientInterfaceFunctions.ThreadDumpFunction tdf = + new HyracksClientInterfaceFunctions.ThreadDumpFunction(node); + return (String) rpci.call(ipcHandle, tdf); + } + + @Override + public boolean isConnected() { + return ipcHandle.isConnected(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java new file mode 100644 index 0000000..e6c28fa --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.impl; + +import java.io.File; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.FileEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.client.IHyracksClientInterface; +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.client.impl.ClusterControllerInfo; +import org.apache.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobInfo; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.topology.ClusterTopology; +import org.apache.hyracks.api.util.InvokeUtil; +import org.apache.hyracks.api.util.JavaSerializationUtils; +import org.apache.hyracks.ipc.api.RPCInterface; +import org.apache.hyracks.util.ExitUtil; +import org.apache.hyracks.util.InterruptibleAction; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster + * Controller. + * + * @author vinayakb + */ +public final class HyracksConnection implements IHyracksClientConnection { + + private static final Logger LOGGER = LogManager.getLogger(); + + private final String ccHost; + + private final int ccPort; + + private final IPCSystem ipc; + + private final IHyracksClientInterface hci; + + private final ClusterControllerInfo ccInfo; + + private volatile boolean running = false; + + private volatile long reqId = 0L; + + private final ExecutorService uninterruptibleExecutor = + Executors.newFixedThreadPool(2, r -> new Thread(r, "HyracksConnection Uninterrubtible thread: ")); + + private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = new ArrayBlockingQueue<>(1); + + /** + * Constructor to create a connection to the Hyracks Cluster Controller. + * + * @param ccHost + * Host name (or IP Address) where the Cluster Controller can be + * reached. + * @param ccPort + * Port to reach the Hyracks Cluster Controller at the specified + * host name. + * @throws Exception + */ + public HyracksConnection(String ccHost, int ccPort) throws Exception { + this.ccHost = ccHost; + this.ccPort = ccPort; + RPCInterface rpci = new RPCInterface(); + ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer()); + ipc.start(); + hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)), + rpci); + ccInfo = hci.getClusterControllerInfo(); + uninterruptibleExecutor.execute(new UninterrubtileRequestHandler()); + uninterruptibleExecutor.execute(new UninterrubtileHandlerWatcher()); + } + + @Override + public JobStatus getJobStatus(JobId jobId) throws Exception { + return hci.getJobStatus(jobId); + } + + @Override + public void cancelJob(JobId jobId) throws Exception { + CancelJobRequest request = new CancelJobRequest(jobId); + uninterruptiblySubmitAndExecute(request); + } + + @Override + public JobId startJob(JobSpecification jobSpec) throws Exception { + return startJob(jobSpec, EnumSet.noneOf(JobFlag.class)); + } + + @Override + public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception { + IActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); + return startJob(jsacggf, jobFlags); + } + + @Override + public void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec) throws Exception { + JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); + hci.redeployJobSpec(deployedJobSpecId, JavaSerializationUtils.serialize(jsacggf)); + } + + @Override + public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception { + JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); + return deployJobSpec(jsacggf); + } + + @Override + public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { + hci.undeployJobSpec(deployedJobSpecId); + } + + @Override + public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception { + StartDeployedJobRequest request = new StartDeployedJobRequest(deployedJobSpecId, jobParameters); + return interruptiblySubmitAndExecute(request); + } + + @Override + public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception { + return startJob(null, acggf, jobFlags); + } + + public DeployedJobSpecId deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception { + return hci.deployJobSpec(JavaSerializationUtils.serialize(acggf)); + } + + @Override + public NetworkAddress getResultDirectoryAddress() throws Exception { + return hci.getResultDirectoryAddress(); + } + + @Override + public void waitForCompletion(JobId jobId) throws Exception { + try { + hci.waitForCompletion(jobId); + } catch (InterruptedException e) { + // Cancels an on-going job if the current thread gets interrupted. + cancelJob(jobId); + throw e; + } + } + + @Override + public Map<String, NodeControllerInfo> getNodeControllerInfos() throws HyracksException { + try { + return hci.getNodeControllersInfo(); + } catch (Exception e) { + throw HyracksException.create(e); + } + } + + @Override + public ClusterTopology getClusterTopology() throws HyracksException { + try { + return hci.getClusterTopology(); + } catch (Exception e) { + throw HyracksException.create(e); + } + } + + @Override + public DeploymentId deployBinary(List<String> jars) throws Exception { + /** generate a deployment id */ + DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString()); + List<URL> binaryURLs = new ArrayList<>(); + if (jars != null && !jars.isEmpty()) { + CloseableHttpClient hc = new DefaultHttpClient(); + try { + /** upload jars through a http client one-by-one to the CC server */ + for (String jar : jars) { + int slashIndex = jar.lastIndexOf('/'); + String fileName = jar.substring(slashIndex + 1); + String url = "http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + + deploymentId.toString() + "&" + fileName; + HttpPut put = new HttpPut(url); + put.setEntity(new FileEntity(new File(jar), "application/octet-stream")); + HttpResponse response = hc.execute(put); + response.getEntity().consumeContent(); + if (response.getStatusLine().getStatusCode() != 200) { + hci.unDeployBinary(deploymentId); + throw new HyracksException(response.getStatusLine().toString()); + } + /** add the uploaded URL address into the URLs of jars to be deployed at NCs */ + binaryURLs.add(new URL(url)); + } + } finally { + hc.close(); + } + } + /** deploy the URLs to the CC and NCs */ + hci.deployBinary(binaryURLs, deploymentId); + return deploymentId; + } + + @Override + public void unDeployBinary(DeploymentId deploymentId) throws Exception { + hci.unDeployBinary(deploymentId); + } + + @Override + public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception { + return startJob(deploymentId, jobSpec, EnumSet.noneOf(JobFlag.class)); + } + + @Override + public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) + throws Exception { + IActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); + return startJob(deploymentId, jsacggf, jobFlags); + } + + @Override + public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, + EnumSet<JobFlag> jobFlags) throws Exception { + StartJobRequest request = new StartJobRequest(deploymentId, acggf, jobFlags); + return interruptiblySubmitAndExecute(request); + } + + @Override + public JobInfo getJobInfo(JobId jobId) throws Exception { + return hci.getJobInfo(jobId); + } + + @Override + public void stopCluster(boolean terminateNCService) throws Exception { + hci.stopCluster(terminateNCService); + } + + @Override + public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception { + return hci.getNodeDetailsJSON(nodeId, includeStats, includeConfig); + } + + @Override + public String getThreadDump(String node) throws Exception { + return hci.getThreadDump(node); + } + + @Override + public String getHost() { + return ccHost; + } + + @Override + public int getPort() { + return ccPort; + } + + @Override + public boolean isConnected() { + return hci.isConnected(); + } + + private <T> T uninterruptiblySubmitAndExecute(UnInterruptibleRequest<T> request) throws Exception { + InvokeUtil.doUninterruptibly(() -> uninterruptibles.put(request)); + return uninterruptiblyExecute(request); + } + + private <T> T uninterruptiblyExecute(UnInterruptibleRequest<T> request) throws Exception { + InvokeUtil.doUninterruptibly(request); + return request.result(); + } + + private <T> T interruptiblySubmitAndExecute(UnInterruptibleRequest<T> request) throws Exception { + uninterruptibles.put(request); + return uninterruptiblyExecute(request); + } + + private abstract class UnInterruptibleRequest<T> implements InterruptibleAction { + boolean completed = false; + boolean failed = false; + Throwable failure = null; + T response = null; + + @SuppressWarnings("squid:S1181") + private final void handle() { + try { + response = doHandle(); + } catch (Throwable th) { + failed = true; + failure = th; + } finally { + synchronized (this) { + completed = true; + notifyAll(); + } + } + } + + protected abstract T doHandle() throws Exception; + + @Override + public final synchronized void run() throws InterruptedException { + while (!completed) { + wait(); + } + } + + public T result() throws Exception { + if (failed) { + if (failure instanceof Error) { + throw (Error) failure; + } + throw (Exception) failure; + } + return response; + } + } + + private class CancelJobRequest extends UnInterruptibleRequest<Void> { + final JobId jobId; + + public CancelJobRequest(JobId jobId) { + this.jobId = jobId; + } + + @Override + protected Void doHandle() throws Exception { + hci.cancelJob(jobId); + return null; + } + + @Override + public String toString() { + return "CancelJobRequest: " + jobId.toString(); + } + + } + + private class StartDeployedJobRequest extends UnInterruptibleRequest<JobId> { + + private final DeployedJobSpecId deployedJobSpecId; + private final Map<byte[], byte[]> jobParameters; + + public StartDeployedJobRequest(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) { + this.deployedJobSpecId = deployedJobSpecId; + this.jobParameters = jobParameters; + } + + @Override + protected JobId doHandle() throws Exception { + return hci.startJob(deployedJobSpecId, jobParameters); + } + + } + + private class StartJobRequest extends UnInterruptibleRequest<JobId> { + private final DeploymentId deploymentId; + private final IActivityClusterGraphGeneratorFactory acggf; + private final EnumSet<JobFlag> jobFlags; + + public StartJobRequest(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, + EnumSet<JobFlag> jobFlags) { + this.deploymentId = deploymentId; + this.acggf = acggf; + this.jobFlags = jobFlags; + } + + @Override + protected JobId doHandle() throws Exception { + if (deploymentId == null) { + return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags); + } else { + return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags); + } + } + + @Override + public String toString() { + return "StartJobRequest"; + } + + } + + private class UninterrubtileRequestHandler implements Runnable { + @SuppressWarnings({ "squid:S2189", "squid:S2142" }) + @Override + public void run() { + String nameBefore = Thread.currentThread().getName(); + Thread.currentThread().setName(nameBefore + getClass().getSimpleName()); + try { + while (true) { + try { + UnInterruptibleRequest<?> current = uninterruptibles.take(); + reqId++; + running = true; + current.handle(); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted."); + continue; + } finally { + running = false; + } + } + } finally { + Thread.currentThread().setName(nameBefore); + } + } + } + + public class UninterrubtileHandlerWatcher implements Runnable { + @Override + @SuppressWarnings({ "squid:S2189", "squid:S2142" }) + public void run() { + String nameBefore = Thread.currentThread().getName(); + Thread.currentThread().setName(nameBefore + getClass().getSimpleName()); + try { + long currentReqId = 0L; + long currentTime = System.nanoTime(); + while (true) { + try { + TimeUnit.MINUTES.sleep(1); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted."); + continue; + } + if (running) { + if (reqId == currentReqId) { + if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) { + ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST); + } + } else { + currentReqId = reqId; + currentTime = System.nanoTime(); + } + } + } + } finally { + Thread.currentThread().setName(nameBefore); + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/pom.xml b/hyracks-fullstack/hyracks/pom.xml index 3b4178d..5990526 100644 --- a/hyracks-fullstack/hyracks/pom.xml +++ b/hyracks-fullstack/hyracks/pom.xml @@ -72,8 +72,8 @@ <modules> <module>hyracks-util</module> - <module>hyracks-ipc</module> <module>hyracks-api</module> + <module>hyracks-ipc</module> <module>hyracks-comm</module> <module>hyracks-client</module> <module>hyracks-dataflow-common</module>
