http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java deleted file mode 100644 index 8e9e3dc..0000000 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.api.dataset; - -import java.util.Arrays; - -public class ResultSetMetaData { - private final DatasetDirectoryRecord[] records; - private final boolean ordered; - - ResultSetMetaData(int len, boolean ordered) { - this.records = new DatasetDirectoryRecord[len]; - this.ordered = ordered; - } - - public boolean getOrderedResult() { - return ordered; - } - - public DatasetDirectoryRecord[] getRecords() { - return records; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{ordered: ").append(ordered).append(", records: ").append(Arrays.toString(records)); - return sb.toString(); - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java index c4c7320..b0a8017 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java @@ -40,7 +40,7 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.api.job.resource.ClusterCapacity; import org.apache.hyracks.api.job.resource.IClusterCapacity; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java index 8a3e15a..8343fe0 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java @@ -20,7 +20,7 @@ package org.apache.hyracks.api.partitions; import java.io.Serializable; -import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.api.job.JobId; public final class ResultSetPartitionId implements Serializable { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java new file mode 100644 index 0000000..41b9d1a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import org.apache.hyracks.api.result.ResultJobRecord.Status; +import org.apache.hyracks.api.job.JobId; + +public interface IResultDirectory { + /** + * Gets the result status for the given result set. + * + * @param jobId + * ID of the job + * @param rsId + * ID of the result set + * @return {@link Status} + * @throws Exception + */ + Status getResultStatus(JobId jobId, ResultSetId rsId) throws Exception; + + /** + * Gets the IP Addresses and ports for the partition generating the result for each location. + * + * @param jobId + * ID of the job + * @param rsId + * ID of the result set + * @param knownRecords + * Locations that are already known to the client + * @return {@link ResultDirectoryRecord[]} + * @throws Exception + */ + ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java new file mode 100644 index 0000000..d45cf44 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import java.util.Set; + +import org.apache.hyracks.api.job.JobId; + +public interface IResultManager { + + Set<JobId> getJobIds(); + + IResultStateRecord getState(JobId jobId); + + void sweep(JobId jobId); + + /** + * Removes all references and deletes persisted files for + * all expired results. + */ + void sweepExpiredResultSets(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java new file mode 100644 index 0000000..a539d37 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; + +public interface IResultPartitionManager extends IResultManager { + IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult, + boolean asyncMode, int partition, int nPartitions, long maxReads) throws HyracksException; + + void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions, + boolean orderedResult, boolean emptyResult) throws HyracksException; + + void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException; + + void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc) + throws HyracksException; + + void removePartition(JobId jobId, ResultSetId resultSetId, int partition); + + void abortReader(JobId jobId); + + void close(); + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java new file mode 100644 index 0000000..ff2e48a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; + +public interface IResultSet { + IResultSetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java new file mode 100644 index 0000000..0884c53 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.result.ResultJobRecord.Status; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IResultSetReader { + Status getResultStatus(); + + int read(IFrame frame) throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java new file mode 100644 index 0000000..fe6bc15 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +public interface IResultStateRecord { + long getTimestamp(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java new file mode 100644 index 0000000..71792be --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import java.io.Serializable; + +import org.apache.hyracks.api.comm.NetworkAddress; + +public class ResultDirectoryRecord implements Serializable { + public enum Status { + IDLE, + RUNNING, + SUCCESS, + FAILED + } + + private static final long serialVersionUID = 1L; + + private NetworkAddress address; + + private boolean readEOS; + + private Status status; + + private boolean empty; + + public ResultDirectoryRecord() { + this.address = null; + this.readEOS = false; + this.status = Status.IDLE; + } + + public void setNetworkAddress(NetworkAddress address) { + this.address = address; + } + + public NetworkAddress getNetworkAddress() { + return address; + } + + public void setEmpty(boolean empty) { + this.empty = empty; + } + + public boolean isEmpty() { + return empty; + } + + public void readEOS() { + this.readEOS = true; + } + + public boolean hasReachedReadEOS() { + return readEOS; + } + + public void start() { + updateStatus(Status.RUNNING); + } + + public void writeEOS() { + updateStatus(Status.SUCCESS); + } + + public void fail() { + status = Status.FAILED; + } + + private void updateStatus(final ResultDirectoryRecord.Status newStatus) { + // FAILED is a stable status + if (status != Status.FAILED) { + status = newStatus; + } + } + + public Status getStatus() { + return status; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof ResultDirectoryRecord)) { + return false; + } + return address.equals(((ResultDirectoryRecord) o).address); + } + + @Override + public String toString() { + return String.valueOf(address) + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : ""); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java new file mode 100644 index 0000000..b8ddbd2 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class ResultJobRecord implements IResultStateRecord { + public enum State { + IDLE, + RUNNING, + SUCCESS, + FAILED + } + + public static class Status implements Serializable { + + private static final long serialVersionUID = 1L; + + State state = State.IDLE; + + private List<Exception> exceptions; + + public State getState() { + return state; + } + + void setState(State state) { + this.state = state; + } + + public List<Exception> getExceptions() { + return exceptions; + } + + void setExceptions(List<Exception> exceptions) { + this.exceptions = exceptions; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ \"state\": \"").append(state.name()).append("\""); + if (exceptions != null && !exceptions.isEmpty()) { + sb.append(", \"exceptions\": "); + List<String> msgs = new ArrayList<>(); + exceptions.forEach(e -> msgs.add("\"" + e.getMessage() + "\"")); + sb.append(Arrays.toString(msgs.toArray())); + } + sb.append(" }"); + return sb.toString(); + } + } + + private static final long serialVersionUID = 1L; + + private final long timestamp; + + private Status status; + + private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>(); + + public ResultJobRecord() { + this.timestamp = System.nanoTime(); + this.status = new Status(); + } + + private void updateState(State newStatus) { + // FAILED is a stable status + if (status.state != State.FAILED) { + status.setState(newStatus); + } + } + + public void start() { + updateState(State.RUNNING); + } + + public void success() { + updateState(State.SUCCESS); + } + + public void fail(ResultSetId rsId, int partition) { + getOrCreateDirectoryRecord(rsId, partition).fail(); + } + + public void fail(List<Exception> exceptions) { + updateState(State.FAILED); + status.setExceptions(exceptions); + } + + @Override + public long getTimestamp() { + return timestamp; + } + + public Status getStatus() { + return status; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ \"status\": ").append(status.toString()).append(", "); + sb.append("\"timestamp\": ").append(timestamp).append(", "); + sb.append("\"resultsets\": ").append(Arrays.toString(resultSetMetadataMap.entrySet().toArray())).append(" }"); + return sb.toString(); + } + + public void setResultSetMetaData(ResultSetId rsId, boolean orderedResult, int nPartitions) + throws HyracksDataException { + ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId); + if (rsMd == null) { + resultSetMetadataMap.put(rsId, new ResultSetMetaData(nPartitions, orderedResult)); + } else if (rsMd.getOrderedResult() != orderedResult || rsMd.getRecords().length != nPartitions) { + throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString()); + } + //TODO(tillw) throwing a HyracksDataException here hangs the execution tests + } + + public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) { + return resultSetMetadataMap.get(rsId); + } + + public synchronized ResultDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) { + ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + if (records[partition] == null) { + records[partition] = new ResultDirectoryRecord(); + } + return records[partition]; + } + + public synchronized ResultDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition) + throws HyracksDataException { + ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + if (records[partition] == null) { + throw HyracksDataException.create(ErrorCode.RESULT_NO_RECORD, partition, rsId); + } + return records[partition]; + } + + public synchronized void updateState(ResultSetId rsId) { + int successCount = 0; + ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + for (ResultDirectoryRecord record : records) { + if ((record != null) && (record.getStatus() == ResultDirectoryRecord.Status.SUCCESS)) { + successCount++; + } + } + if (successCount == records.length) { + success(); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java new file mode 100644 index 0000000..ffd4ced --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import java.io.Serializable; + +public class ResultSetId implements Serializable { + private static final long serialVersionUID = 1L; + + private final long id; + + public ResultSetId(long id) { + this.id = id; + } + + public long getId() { + return id; + } + + @Override + public int hashCode() { + return (int) id; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof ResultSetId)) { + return false; + } + return ((ResultSetId) o).id == id; + } + + @Override + public String toString() { + return "RSID:" + id; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java new file mode 100644 index 0000000..b7b8f1c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.result; + +import java.util.Arrays; + +public class ResultSetMetaData { + private final ResultDirectoryRecord[] records; + private final boolean ordered; + + ResultSetMetaData(int len, boolean ordered) { + this.records = new ResultDirectoryRecord[len]; + this.ordered = ordered; + } + + public boolean getOrderedResult() { + return ordered; + } + + public ResultDirectoryRecord[] getRecords() { + return records; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ordered: ").append(ordered).append(", records: ").append(Arrays.toString(records)); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java deleted file mode 100644 index 7f549ca..0000000 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.client.dataset; - -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.comm.NetworkAddress; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.dataset.IHyracksDataset; -import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection; -import org.apache.hyracks.api.dataset.IHyracksDatasetReader; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.client.net.ClientNetworkManager; -import org.apache.hyracks.control.nc.resources.memory.FrameManager; - -public class HyracksDataset implements IHyracksDataset { - private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection; - - private final ClientNetworkManager netManager; - - private final IHyracksCommonContext datasetClientCtx; - - public HyracksDataset(IHyracksClientConnection hcc, int frameSize, int nReaders) throws Exception { - NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo(); - datasetDirectoryServiceConnection = - new HyracksDatasetDirectoryServiceConnection(ddsAddress.getAddress(), ddsAddress.getPort()); - - netManager = new ClientNetworkManager(nReaders); - netManager.start(); - - datasetClientCtx = new DatasetClientContext(frameSize); - } - - @Override - public IHyracksDatasetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException { - IHyracksDatasetReader reader = null; - try { - reader = new HyracksDatasetReader(datasetDirectoryServiceConnection, netManager, datasetClientCtx, jobId, - resultSetId); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - return reader; - } - - static class DatasetClientContext extends FrameManager implements IHyracksCommonContext { - - DatasetClientContext(int frameSize) { - super(frameSize); - } - - @Override - public IIOManager getIoManager() { - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java deleted file mode 100644 index 63139d9..0000000 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.client.dataset; - -import java.net.InetSocketAddress; - -import org.apache.hyracks.api.dataset.DatasetDirectoryRecord; -import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; -import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection; -import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.ipc.api.IIPCHandle; -import org.apache.hyracks.ipc.api.RPCInterface; -import org.apache.hyracks.ipc.impl.IPCSystem; -import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; - -//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client? -public class HyracksDatasetDirectoryServiceConnection implements IHyracksDatasetDirectoryServiceConnection { - private final IPCSystem ipc; - private final IHyracksDatasetDirectoryServiceInterface ddsi; - - public HyracksDatasetDirectoryServiceConnection(String ddsHost, int ddsPort) throws Exception { - RPCInterface rpci = new RPCInterface(); - ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer()); - ipc.start(); - IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new InetSocketAddress(ddsHost, ddsPort)); - this.ddsi = new HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci); - } - - @Override - public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception { - return ddsi.getDatasetResultStatus(jobId, rsId); - } - - @Override - public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId, - DatasetDirectoryRecord[] knownRecords) throws Exception { - return ddsi.getDatasetResultLocations(jobId, rsId, knownRecords); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java deleted file mode 100644 index 7eeb913..0000000 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.client.dataset; - -import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; -import org.apache.hyracks.api.dataset.DatasetDirectoryRecord; -import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; -import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.ipc.api.IIPCHandle; -import org.apache.hyracks.ipc.api.RPCInterface; - -//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client? -public class HyracksDatasetDirectoryServiceInterfaceRemoteProxy implements IHyracksDatasetDirectoryServiceInterface { - private final IIPCHandle ipcHandle; - - private final RPCInterface rpci; - - public HyracksDatasetDirectoryServiceInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) { - this.ipcHandle = ipcHandle; - this.rpci = rpci; - } - - @Override - public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception { - HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = - new HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(jobId, rsId); - return (Status) rpci.call(ipcHandle, gdrlf); - } - - @Override - public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId, - DatasetDirectoryRecord[] knownRecords) throws Exception { - HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = - new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(jobId, rsId, knownRecords); - return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java deleted file mode 100644 index b1566f4..0000000 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.client.dataset; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.channels.IInputChannel; -import org.apache.hyracks.api.channels.IInputChannelMonitor; -import org.apache.hyracks.api.comm.FrameHelper; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.NetworkAddress; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.dataset.DatasetDirectoryRecord; -import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; -import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection; -import org.apache.hyracks.api.dataset.IHyracksDatasetReader; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.exceptions.ErrorCode; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.client.net.ClientNetworkManager; -import org.apache.hyracks.comm.channels.DatasetNetworkInputChannel; -import org.apache.hyracks.util.annotations.NotThreadSafe; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -@NotThreadSafe -public class HyracksDatasetReader implements IHyracksDatasetReader { - - private static final Logger LOGGER = LogManager.getLogger(); - private static final int NUM_READ_BUFFERS = 1; - private final IHyracksDatasetDirectoryServiceConnection datasetDirectory; - private final ClientNetworkManager netManager; - private final IHyracksCommonContext datasetClientCtx; - private final JobId jobId; - private final ResultSetId resultSetId; - private DatasetDirectoryRecord[] knownRecords; - private DatasetInputChannelMonitor[] monitors; - private DatasetInputChannelMonitor currentRecordMonitor; - private DatasetNetworkInputChannel currentRecordChannel; - private int currentRecord; - - public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection datasetDirectory, - ClientNetworkManager netManager, IHyracksCommonContext datasetClientCtx, JobId jobId, - ResultSetId resultSetId) { - this.datasetDirectory = datasetDirectory; - this.netManager = netManager; - this.datasetClientCtx = datasetClientCtx; - this.jobId = jobId; - this.resultSetId = resultSetId; - currentRecord = -1; - } - - @Override - public Status getResultStatus() { - try { - return datasetDirectory.getDatasetResultStatus(jobId, resultSetId); - } catch (HyracksDataException e) { - if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) { - LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e); - } - } catch (Exception e) { - LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e); - } - return null; - } - - @Override - public int read(IFrame frame) throws HyracksDataException { - frame.reset(); - int readSize = 0; - if (isFirstRead() && !hasNextRecord()) { - return readSize; - } - // read until frame is full or all dataset records have been read - while (readSize < frame.getFrameSize()) { - if (currentRecordMonitor.hasMoreFrames()) { - final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer(); - if (readBuffer == null) { - throw new IllegalStateException("Unexpected empty frame"); - } - currentRecordMonitor.notifyFrameRead(); - if (readSize == 0) { - final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer); - frame.ensureFrameSize(frame.getMinSize() * nBlocks); - frame.getBuffer().clear(); - } - frame.getBuffer().put(readBuffer); - currentRecordChannel.recycleBuffer(readBuffer); - readSize = frame.getBuffer().position(); - } else { - currentRecordChannel.close(); - if (currentRecordMonitor.failed()) { - throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId); - } - if (isLastRecord() || !hasNextRecord()) { - break; - } - } - } - frame.getBuffer().flip(); - return readSize; - } - - private SocketAddress getSocketAddress(DatasetDirectoryRecord record) throws HyracksDataException { - try { - final NetworkAddress netAddr = record.getNetworkAddress(); - return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort()); - } catch (UnknownHostException e) { - throw HyracksDataException.create(e); - } - } - - private DatasetInputChannelMonitor getMonitor(int partition) { - if (knownRecords == null || knownRecords[partition] == null) { - throw new IllegalStateException("Accessing monitors before obtaining the corresponding addresses"); - } - if (monitors == null) { - monitors = new DatasetInputChannelMonitor[knownRecords.length]; - } - if (monitors[partition] == null) { - monitors[partition] = new DatasetInputChannelMonitor(); - } - return monitors[partition]; - } - - private boolean hasNextRecord() throws HyracksDataException { - currentRecord++; - DatasetDirectoryRecord record = getRecord(currentRecord); - // skip empty records - while (record.isEmpty() && ++currentRecord < knownRecords.length) { - record = getRecord(currentRecord); - } - if (currentRecord == knownRecords.length) { - // exhausted all known records - return false; - } - requestRecordData(record); - return true; - } - - private DatasetDirectoryRecord getRecord(int recordNum) throws HyracksDataException { - try { - while (knownRecords == null || knownRecords[recordNum] == null) { - knownRecords = datasetDirectory.getDatasetResultLocations(jobId, resultSetId, knownRecords); - } - return knownRecords[recordNum]; - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } - - private void requestRecordData(DatasetDirectoryRecord record) throws HyracksDataException { - currentRecordChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId, - currentRecord, NUM_READ_BUFFERS); - currentRecordMonitor = getMonitor(currentRecord); - currentRecordChannel.registerMonitor(currentRecordMonitor); - currentRecordChannel.open(datasetClientCtx); - } - - private boolean isFirstRead() { - return currentRecord == -1; - } - - private boolean isLastRecord() { - return knownRecords != null && currentRecord == knownRecords.length - 1; - } - - private static class DatasetInputChannelMonitor implements IInputChannelMonitor { - - private int availableFrames; - private boolean eos; - private boolean failed; - - DatasetInputChannelMonitor() { - eos = false; - failed = false; - } - - @Override - public synchronized void notifyFailure(IInputChannel channel, int errorCode) { - failed = true; - notifyAll(); - } - - @Override - public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) { - availableFrames += nFrames; - notifyAll(); - } - - @Override - public synchronized void notifyEndOfStream(IInputChannel channel) { - eos = true; - notifyAll(); - } - - synchronized boolean failed() { - return failed; - } - - synchronized void notifyFrameRead() { - availableFrames--; - notifyAll(); - } - - synchronized boolean hasMoreFrames() throws HyracksDataException { - while (!failed && !eos && availableFrames == 0) { - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); - } - } - return !failed && !isFullyConsumed(); - } - - private synchronized boolean isFullyConsumed() { - return availableFrames == 0 && eos; - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java new file mode 100644 index 0000000..6f8c4d0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.client.result; + +import java.net.InetSocketAddress; + +import org.apache.hyracks.api.result.ResultJobRecord.Status; +import org.apache.hyracks.api.result.IResultDirectory; +import org.apache.hyracks.api.result.ResultDirectoryRecord; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.api.RPCInterface; +import org.apache.hyracks.ipc.impl.IPCSystem; +import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; + +//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client? +public class ResultDirectory implements IResultDirectory { + private final IPCSystem ipc; + private final IResultDirectory remoteResultDirectory; + + public ResultDirectory(String resultHost, int resultPort) throws Exception { + RPCInterface rpci = new RPCInterface(); + ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer()); + ipc.start(); + IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new InetSocketAddress(resultHost, resultPort)); + this.remoteResultDirectory = new ResultDirectoryRemoteProxy(ddsIpchandle, rpci); + } + + @Override + public Status getResultStatus(JobId jobId, ResultSetId rsId) throws Exception { + return remoteResultDirectory.getResultStatus(jobId, rsId); + } + + @Override + public ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId, + ResultDirectoryRecord[] knownRecords) throws Exception { + return remoteResultDirectory.getResultLocations(jobId, rsId, knownRecords); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java new file mode 100644 index 0000000..77c6e4b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.client.result; + +import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; +import org.apache.hyracks.api.result.ResultJobRecord.Status; +import org.apache.hyracks.api.result.IResultDirectory; +import org.apache.hyracks.api.result.ResultDirectoryRecord; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.api.RPCInterface; + +//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client? +public class ResultDirectoryRemoteProxy implements IResultDirectory { + private final IIPCHandle ipcHandle; + + private final RPCInterface rpci; + + public ResultDirectoryRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) { + this.ipcHandle = ipcHandle; + this.rpci = rpci; + } + + @Override + public Status getResultStatus(JobId jobId, ResultSetId rsId) throws Exception { + HyracksClientInterfaceFunctions.GetResultStatusFunction gdrlf = + new HyracksClientInterfaceFunctions.GetResultStatusFunction(jobId, rsId); + return (Status) rpci.call(ipcHandle, gdrlf); + } + + @Override + public ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId, + ResultDirectoryRecord[] knownRecords) throws Exception { + HyracksClientInterfaceFunctions.GetResultLocationsFunction gdrlf = + new HyracksClientInterfaceFunctions.GetResultLocationsFunction(jobId, rsId, knownRecords); + return (ResultDirectoryRecord[]) rpci.call(ipcHandle, gdrlf); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java new file mode 100644 index 0000000..ef93cce --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.client.result; + +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.result.IResultSet; +import org.apache.hyracks.api.result.IResultDirectory; +import org.apache.hyracks.api.result.IResultSetReader; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.client.net.ClientNetworkManager; +import org.apache.hyracks.control.nc.resources.memory.FrameManager; + +public class ResultSet implements IResultSet { + private final IResultDirectory resultDirectory; + + private final ClientNetworkManager netManager; + + private final IHyracksCommonContext resultClientCtx; + + public ResultSet(IHyracksClientConnection hcc, int frameSize, int nReaders) throws Exception { + NetworkAddress ddsAddress = hcc.getResultDirectoryAddress(); + resultDirectory = new ResultDirectory(ddsAddress.getAddress(), ddsAddress.getPort()); + + netManager = new ClientNetworkManager(nReaders); + netManager.start(); + + resultClientCtx = new ResultClientContext(frameSize); + } + + @Override + public IResultSetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException { + IResultSetReader reader = null; + try { + reader = new ResultSetReader(resultDirectory, netManager, resultClientCtx, jobId, resultSetId); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + return reader; + } + + static class ResultClientContext extends FrameManager implements IHyracksCommonContext { + + ResultClientContext(int frameSize) { + super(frameSize); + } + + @Override + public IIOManager getIoManager() { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java new file mode 100644 index 0000000..092d860 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.client.result; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.channels.IInputChannel; +import org.apache.hyracks.api.channels.IInputChannelMonitor; +import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.result.ResultJobRecord.Status; +import org.apache.hyracks.api.result.IResultDirectory; +import org.apache.hyracks.api.result.IResultSetReader; +import org.apache.hyracks.api.result.ResultDirectoryRecord; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.client.net.ClientNetworkManager; +import org.apache.hyracks.comm.channels.ResultNetworkInputChannel; +import org.apache.hyracks.util.annotations.NotThreadSafe; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +@NotThreadSafe +public class ResultSetReader implements IResultSetReader { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final int NUM_READ_BUFFERS = 1; + private final IResultDirectory resultDirectory; + private final ClientNetworkManager netManager; + private final IHyracksCommonContext resultClientCtx; + private final JobId jobId; + private final ResultSetId resultSetId; + private ResultDirectoryRecord[] knownRecords; + private ResultInputChannelMonitor[] monitors; + private ResultInputChannelMonitor currentRecordMonitor; + private ResultNetworkInputChannel currentRecordChannel; + private int currentRecord; + + public ResultSetReader(IResultDirectory resultDirectory, ClientNetworkManager netManager, + IHyracksCommonContext resultClientCtx, JobId jobId, ResultSetId resultSetId) { + this.resultDirectory = resultDirectory; + this.netManager = netManager; + this.resultClientCtx = resultClientCtx; + this.jobId = jobId; + this.resultSetId = resultSetId; + currentRecord = -1; + } + + @Override + public Status getResultStatus() { + try { + return resultDirectory.getResultStatus(jobId, resultSetId); + } catch (HyracksDataException e) { + if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) { + LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e); + } + } catch (Exception e) { + LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e); + } + return null; + } + + @Override + public int read(IFrame frame) throws HyracksDataException { + frame.reset(); + int readSize = 0; + if (isFirstRead() && !hasNextRecord()) { + return readSize; + } + // read until frame is full or all result records have been read + while (readSize < frame.getFrameSize()) { + if (currentRecordMonitor.hasMoreFrames()) { + final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer(); + if (readBuffer == null) { + throw new IllegalStateException("Unexpected empty frame"); + } + currentRecordMonitor.notifyFrameRead(); + if (readSize == 0) { + final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer); + frame.ensureFrameSize(frame.getMinSize() * nBlocks); + frame.getBuffer().clear(); + } + frame.getBuffer().put(readBuffer); + currentRecordChannel.recycleBuffer(readBuffer); + readSize = frame.getBuffer().position(); + } else { + currentRecordChannel.close(); + if (currentRecordMonitor.failed()) { + throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId); + } + if (isLastRecord() || !hasNextRecord()) { + break; + } + } + } + frame.getBuffer().flip(); + return readSize; + } + + private SocketAddress getSocketAddress(ResultDirectoryRecord record) throws HyracksDataException { + try { + final NetworkAddress netAddr = record.getNetworkAddress(); + return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort()); + } catch (UnknownHostException e) { + throw HyracksDataException.create(e); + } + } + + private ResultInputChannelMonitor getMonitor(int partition) { + if (knownRecords == null || knownRecords[partition] == null) { + throw new IllegalStateException("Accessing monitors before obtaining the corresponding addresses"); + } + if (monitors == null) { + monitors = new ResultInputChannelMonitor[knownRecords.length]; + } + if (monitors[partition] == null) { + monitors[partition] = new ResultInputChannelMonitor(); + } + return monitors[partition]; + } + + private boolean hasNextRecord() throws HyracksDataException { + currentRecord++; + ResultDirectoryRecord record = getRecord(currentRecord); + // skip empty records + while (record.isEmpty() && ++currentRecord < knownRecords.length) { + record = getRecord(currentRecord); + } + if (currentRecord == knownRecords.length) { + // exhausted all known records + return false; + } + requestRecordData(record); + return true; + } + + private ResultDirectoryRecord getRecord(int recordNum) throws HyracksDataException { + try { + while (knownRecords == null || knownRecords[recordNum] == null) { + knownRecords = resultDirectory.getResultLocations(jobId, resultSetId, knownRecords); + } + return knownRecords[recordNum]; + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + private void requestRecordData(ResultDirectoryRecord record) throws HyracksDataException { + currentRecordChannel = new ResultNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId, + currentRecord, NUM_READ_BUFFERS); + currentRecordMonitor = getMonitor(currentRecord); + currentRecordChannel.registerMonitor(currentRecordMonitor); + currentRecordChannel.open(resultClientCtx); + } + + private boolean isFirstRead() { + return currentRecord == -1; + } + + private boolean isLastRecord() { + return knownRecords != null && currentRecord == knownRecords.length - 1; + } + + private static class ResultInputChannelMonitor implements IInputChannelMonitor { + + private int availableFrames; + private boolean eos; + private boolean failed; + + ResultInputChannelMonitor() { + eos = false; + failed = false; + } + + @Override + public synchronized void notifyFailure(IInputChannel channel, int errorCode) { + failed = true; + notifyAll(); + } + + @Override + public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) { + availableFrames += nFrames; + notifyAll(); + } + + @Override + public synchronized void notifyEndOfStream(IInputChannel channel) { + eos = true; + notifyAll(); + } + + synchronized boolean failed() { + return failed; + } + + synchronized void notifyFrameRead() { + availableFrames--; + notifyAll(); + } + + synchronized boolean hasMoreFrames() throws HyracksDataException { + while (!failed && !eos && availableFrames == 0) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + return !failed && !isFullyConsumed(); + } + + private synchronized boolean isFullyConsumed() { + return availableFrames == 0 && eos; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java deleted file mode 100644 index 0f96a6e..0000000 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.comm.channels; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Queue; - -import org.apache.hyracks.api.channels.IInputChannel; -import org.apache.hyracks.api.channels.IInputChannelMonitor; -import org.apache.hyracks.api.comm.IBufferAcceptor; -import org.apache.hyracks.api.comm.IChannelControlBlock; -import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class DatasetNetworkInputChannel implements IInputChannel { - private static final Logger LOGGER = LogManager.getLogger(); - - static final int INITIAL_MESSAGE_SIZE = 20; - - private final IChannelConnectionFactory netManager; - - private final SocketAddress remoteAddress; - - private final JobId jobId; - - private final ResultSetId resultSetId; - - private final int partition; - - private final Queue<ByteBuffer> fullQueue; - - private final int nBuffers; - - private IChannelControlBlock ccb; - - private IInputChannelMonitor monitor; - - private Object attachment; - - public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId, - ResultSetId resultSetId, int partition, int nBuffers) { - this.netManager = netManager; - this.remoteAddress = remoteAddress; - this.jobId = jobId; - this.resultSetId = resultSetId; - this.partition = partition; - fullQueue = new ArrayDeque<ByteBuffer>(nBuffers); - this.nBuffers = nBuffers; - } - - @Override - public void registerMonitor(IInputChannelMonitor monitor) { - this.monitor = monitor; - } - - @Override - public void setAttachment(Object attachment) { - this.attachment = attachment; - } - - @Override - public Object getAttachment() { - return attachment; - } - - @Override - public synchronized ByteBuffer getNextBuffer() { - return fullQueue.poll(); - } - - @Override - public void recycleBuffer(ByteBuffer buffer) { - buffer.clear(); - ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer); - } - - @Override - public void open(IHyracksCommonContext ctx) throws HyracksDataException { - try { - ccb = netManager.connect(remoteAddress); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor()); - ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor()); - for (int i = 0; i < nBuffers; ++i) { - ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame()); - } - ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE); - writeBuffer.putLong(jobId.getId()); - writeBuffer.putLong(resultSetId.getId()); - writeBuffer.putInt(partition); - writeBuffer.flip(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sending partition request for JobId: " + jobId + " partition: " + partition + " on channel: " - + ccb); - } - ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer); - ccb.getWriteInterface().getFullBufferAcceptor().close(); - } - - @Override - public void close() throws HyracksDataException { - - } - - private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor { - @Override - public void accept(ByteBuffer buffer) { - fullQueue.add(buffer); - monitor.notifyDataAvailability(DatasetNetworkInputChannel.this, 1); - } - - @Override - public void close() { - monitor.notifyEndOfStream(DatasetNetworkInputChannel.this); - } - - @Override - public void error(int ecode) { - monitor.notifyFailure(DatasetNetworkInputChannel.this, ecode); - } - } - - private class WriteEmptyBufferAcceptor implements IBufferAcceptor { - @Override - public void accept(ByteBuffer buffer) { - // do nothing - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java new file mode 100644 index 0000000..37540cd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.comm.channels; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Queue; + +import org.apache.hyracks.api.channels.IInputChannel; +import org.apache.hyracks.api.channels.IInputChannelMonitor; +import org.apache.hyracks.api.comm.IBufferAcceptor; +import org.apache.hyracks.api.comm.IChannelControlBlock; +import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; +import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ResultNetworkInputChannel implements IInputChannel { + private static final Logger LOGGER = LogManager.getLogger(); + + static final int INITIAL_MESSAGE_SIZE = 20; + + private final IChannelConnectionFactory netManager; + + private final SocketAddress remoteAddress; + + private final JobId jobId; + + private final ResultSetId resultSetId; + + private final int partition; + + private final Queue<ByteBuffer> fullQueue; + + private final int nBuffers; + + private IChannelControlBlock ccb; + + private IInputChannelMonitor monitor; + + private Object attachment; + + public ResultNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId, + ResultSetId resultSetId, int partition, int nBuffers) { + this.netManager = netManager; + this.remoteAddress = remoteAddress; + this.jobId = jobId; + this.resultSetId = resultSetId; + this.partition = partition; + fullQueue = new ArrayDeque<ByteBuffer>(nBuffers); + this.nBuffers = nBuffers; + } + + @Override + public void registerMonitor(IInputChannelMonitor monitor) { + this.monitor = monitor; + } + + @Override + public void setAttachment(Object attachment) { + this.attachment = attachment; + } + + @Override + public Object getAttachment() { + return attachment; + } + + @Override + public synchronized ByteBuffer getNextBuffer() { + return fullQueue.poll(); + } + + @Override + public void recycleBuffer(ByteBuffer buffer) { + buffer.clear(); + ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer); + } + + @Override + public void open(IHyracksCommonContext ctx) throws HyracksDataException { + try { + ccb = netManager.connect(remoteAddress); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor()); + ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor()); + for (int i = 0; i < nBuffers; ++i) { + ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame()); + } + ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE); + writeBuffer.putLong(jobId.getId()); + writeBuffer.putLong(resultSetId.getId()); + writeBuffer.putInt(partition); + writeBuffer.flip(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending partition request for JobId: " + jobId + " partition: " + partition + " on channel: " + + ccb); + } + ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer); + ccb.getWriteInterface().getFullBufferAcceptor().close(); + } + + @Override + public void close() throws HyracksDataException { + + } + + private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor { + @Override + public void accept(ByteBuffer buffer) { + fullQueue.add(buffer); + monitor.notifyDataAvailability(ResultNetworkInputChannel.this, 1); + } + + @Override + public void close() { + monitor.notifyEndOfStream(ResultNetworkInputChannel.this); + } + + @Override + public void error(int ecode) { + monitor.notifyFailure(ResultNetworkInputChannel.this, ecode); + } + } + + private class WriteEmptyBufferAcceptor implements IBufferAcceptor { + @Override + public void accept(ByteBuffer buffer) { + // do nothing + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index a669402..2edbab8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -20,21 +20,21 @@ package org.apache.hyracks.control.cc; import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; import org.apache.hyracks.api.comm.NetworkAddress; -import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.DeployedJobSpecIdFactory; import org.apache.hyracks.api.job.JobIdFactory; import org.apache.hyracks.api.job.JobInfo; +import org.apache.hyracks.api.result.ResultJobRecord.Status; import org.apache.hyracks.control.cc.work.CancelJobWork; import org.apache.hyracks.control.cc.work.CliDeployBinaryWork; import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork; import org.apache.hyracks.control.cc.work.ClusterShutdownWork; import org.apache.hyracks.control.cc.work.DeployJobSpecWork; -import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork; import org.apache.hyracks.control.cc.work.GetJobInfoWork; import org.apache.hyracks.control.cc.work.GetJobStatusWork; import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork; import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork; +import org.apache.hyracks.control.cc.work.GetResultDirectoryAddressWork; import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork; import org.apache.hyracks.control.cc.work.GetResultStatusWork; import org.apache.hyracks.control.cc.work.GetThreadDumpWork; @@ -121,19 +121,19 @@ class ClientInterfaceIPCI implements IIPCI { ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(), jobIdFactory, sjf.getJobParameters(), new IPCResponder<>(handle, mid), id)); break; - case GET_DATASET_DIRECTORY_SERIVICE_INFO: + case GET_RESULT_DIRECTORY_ADDRESS: ccs.getWorkQueue().schedule( - new GetDatasetDirectoryServiceInfoWork(ccs, new IPCResponder<NetworkAddress>(handle, mid))); + new GetResultDirectoryAddressWork(ccs, new IPCResponder<NetworkAddress>(handle, mid))); break; - case GET_DATASET_RESULT_STATUS: - HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrsf = - (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn; + case GET_RESULT_STATUS: + HyracksClientInterfaceFunctions.GetResultStatusFunction gdrsf = + (HyracksClientInterfaceFunctions.GetResultStatusFunction) fn; ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(), gdrsf.getResultSetId(), new IPCResponder<Status>(handle, mid))); break; - case GET_DATASET_RESULT_LOCATIONS: - HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = - (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn; + case GET_RESULT_LOCATIONS: + HyracksClientInterfaceFunctions.GetResultLocationsFunction gdrlf = + (HyracksClientInterfaceFunctions.GetResultLocationsFunction) fn; ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new IPCResponder<>(handle, mid))); break;