Repository: asterixdb Updated Branches: refs/heads/master d57d81ff9 -> a85f4121f
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java index dde3bad..fa6580e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java @@ -45,7 +45,7 @@ public class NodeManagerTest { @Test public void testNormal() throws HyracksException { IResourceManager resourceManager = new ResourceManager(); - INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager); + INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager); NodeControllerState ncState1 = mockNodeControllerState(NODE1, false); NodeControllerState ncState2 = mockNodeControllerState(NODE2, false); @@ -70,7 +70,7 @@ public class NodeManagerTest { @Test public void testException() throws HyracksException { IResourceManager resourceManager = new ResourceManager(); - INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager); + INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager); NodeControllerState ncState1 = mockNodeControllerState(NODE1, true); boolean invalidNetworkAddress = false; @@ -89,7 +89,7 @@ public class NodeManagerTest { @Test public void testNullNode() throws HyracksException { IResourceManager resourceManager = new ResourceManager(); - INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager); + INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager); boolean invalidParameter = false; // Verifies states after a failure during adding nodes. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java index 3bc549e..1d506ca 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java @@ -59,10 +59,13 @@ public class ExceptionUtils { List<Exception> newExceptions = new ArrayList<>(); for (Exception e : exceptions) { if (e instanceof HyracksDataException) { - newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId)); + if (((HyracksDataException) e).getNodeId() == null) { + newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId)); + } else { + newExceptions.add(e); + } } else { - newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(), - e, nodeId)); + newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e, nodeId)); } } exceptions.clear(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index fc911c0..b1f39f7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -179,8 +179,8 @@ public class NodeControllerService implements IControllerService { this.application = application; id = ncConfig.getNodeId(); - ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), - application.getFileDeviceResolver()); + ioManager = + new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver()); if (id == null) { throw new HyracksException("id not set"); } @@ -274,8 +274,7 @@ public class NodeControllerService implements IControllerService { partitionManager = new PartitionManager(this); netManager = new NetworkManager(ncConfig.getDataListenAddress(), ncConfig.getDataListenPort(), partitionManager, ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getDataPublicAddress(), - ncConfig.getDataPublicPort(), - FullFrameChannelInterfaceFactory.INSTANCE); + ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE); netManager.start(); startApplication(); @@ -288,16 +287,17 @@ public class NodeControllerService implements IControllerService { this.ccs = new ClusterControllerRemoteProxy(ipc, new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()), ncConfig.getClusterConnectRetries(), new IControllerRemoteProxyIPCEventListener() { - @Override - public void ipcHandleRestored(IIPCHandle handle) throws IPCException { - // we need to re-register in case the NC -> CC connection reset was due to CC shutdown - try { - registerNode(); - } catch (Exception e) { - throw new IPCException(e); - } - } - }); + @Override + public void ipcHandleRestored(IIPCHandle handle) throws IPCException { + // we need to re-register in case of NC -> CC connection reset + try { + registerNode(); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed Registering with cc", e); + throw new IPCException(e); + } + } + }); registerNode(); workQueue.start(); @@ -332,15 +332,15 @@ public class NodeControllerService implements IControllerService { // Use "public" versions of network addresses and ports NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress(); NetworkAddress netAddress = netManager.getPublicNetworkAddress(); - NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() - : null; + NetworkAddress meesagingPort = + messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null; int allCores = osMXBean.getAvailableProcessors(); nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress, - osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores, - runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), - runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), - runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort, - application.getCapacity(), PidHelper.getPid(), maxJobId.get()); + osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(), + runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), + runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(), + runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort, application.getCapacity(), + PidHelper.getPid(), maxJobId.get()); ccs.registerNode(nodeRegistration); @@ -572,12 +572,12 @@ public class NodeControllerService implements IControllerService { private static INCApplication getApplication(NCConfig config) throws ClassNotFoundException, IllegalAccessException, InstantiationException { - if (config.getAppClass() != null) { - Class<?> c = Class.forName(config.getAppClass()); - return (INCApplication) c.newInstance(); - } else { - return BaseNCApplication.INSTANCE; - } + if (config.getAppClass() != null) { + Class<?> c = Class.forName(config.getAppClass()); + return (INCApplication) c.newInstance(); + } else { + return BaseNCApplication.INSTANCE; + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index 361ee37..962d541 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -44,7 +44,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { private final Executor executor; - private final Map<JobId, IDatasetStateRecord> partitionResultStateMap; + private final Map<JobId, ResultSetMap> partitionResultStateMap; private final DefaultDeallocatableRegistry deallocatableRegistry; @@ -76,8 +76,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions, datasetMemoryManager, fileFactory); - ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.computeIfAbsent(jobId, - k -> new ResultSetMap()); + ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap()); ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions); resultStates[partition] = dpw.getResultState(); @@ -122,7 +121,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { protected synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException { - ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); + ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); if (rsIdMap == null) { throw new HyracksException("Unknown JobId " + jobId); } @@ -139,7 +138,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { @Override public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) { - ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); + ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, partition)) { partitionResultStateMap.remove(jobId); } @@ -147,13 +146,20 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { @Override public synchronized void abortReader(JobId jobId) { - ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); + ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); if (rsIdMap != null) { rsIdMap.abortAll(); } } @Override + public synchronized void abortAllReaders() { + for (ResultSetMap rsIdMap : partitionResultStateMap.values()) { + rsIdMap.abortAll(); + } + } + + @Override public synchronized void close() { for (JobId jobId : getJobIds()) { deinit(jobId); @@ -167,7 +173,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { } @Override - public IDatasetStateRecord getState(JobId jobId) { + public ResultSetMap getState(JobId jobId) { return partitionResultStateMap.get(jobId); } @@ -187,7 +193,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { } private synchronized void deinit(JobId jobId) { - ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); + ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); if (rsIdMap != null) { rsIdMap.closeAndDeleteAll(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java index fd434d7..16e5027 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java @@ -39,9 +39,7 @@ public class PipelinedPartition implements IFrameWriter, IPartition { private IFrameWriter delegate; - private boolean pendingConnection; - - private boolean failed; + private volatile boolean pendingConnection = true; public PipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid, TaskAttemptId taId) { this.ctx = ctx; @@ -74,16 +72,13 @@ public class PipelinedPartition implements IFrameWriter, IPartition { @Override public void open() throws HyracksDataException { manager.registerPartition(pid, taId, this, PartitionState.STARTED, false); - failed = false; pendingConnection = true; ensureConnected(); } @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - if (!failed) { - delegate.nextFrame(buffer); - } + delegate.nextFrame(buffer); } private void ensureConnected() throws HyracksDataException { @@ -93,7 +88,8 @@ public class PipelinedPartition implements IFrameWriter, IPartition { try { wait(); } catch (InterruptedException e) { - throw new HyracksDataException(e); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } } } @@ -104,22 +100,21 @@ public class PipelinedPartition implements IFrameWriter, IPartition { @Override public void fail() throws HyracksDataException { - failed = true; - if (delegate != null) { + if (!pendingConnection) { delegate.fail(); } } @Override public void close() throws HyracksDataException { - if (!failed) { + if (!pendingConnection) { delegate.close(); } } @Override public void flush() throws HyracksDataException { - if (!failed) { + if (!pendingConnection) { delegate.flush(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java new file mode 100644 index 0000000..4fb4bf6 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.work; + +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.dataset.IDatasetPartitionManager; +import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.hyracks.control.nc.Joblet; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.Task; + +public class AbortAllTasksWork extends SynchronizableWork { + + private static final Logger LOGGER = Logger.getLogger(AbortAllTasksWork.class.getName()); + private final NodeControllerService ncs; + + public AbortAllTasksWork(NodeControllerService ncs) { + this.ncs = ncs; + } + + @Override + protected void doRun() throws Exception { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Aborting all tasks"); + } + IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); + if (dpm != null) { + ncs.getDatasetPartitionManager().abortAllReaders(); + } + for (Joblet ji : ncs.getJobletMap().values()) { + Map<TaskAttemptId, Task> taskMap = ji.getTaskMap(); + for (Task task : taskMap.values()) { + if (task != null) { + task.abort(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java index 07e1ad2..5870e76 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java @@ -65,8 +65,8 @@ public class AbortTasksWork extends AbstractWork { } } } else { - LOGGER.log(Level.WARNING, "Joblet couldn't be found. Tasks of job " + jobId - + " have all either completed or failed"); + LOGGER.log(Level.WARNING, + "Joblet couldn't be found. Tasks of job " + jobId + " have all either completed or failed"); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java index 66e7ae0..d09b890 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java @@ -39,11 +39,12 @@ public class FrameDebugUtils { /** * Debugging method + * * @param fta * @param recordDescriptor * @param prefix */ - public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix) { + public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix) { try (ByteBufferInputStream bbis = new ByteBufferInputStream(); DataInputStream dis = new DataInputStream(bbis)) { int tc = fta.getTupleCount(); @@ -60,19 +61,21 @@ public class FrameDebugUtils { /** * Debugging method + * * @param fta * @param recordDescriptor */ - public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor) { + public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor) { prettyPrint(fta, recordDescriptor, ""); } /** * Debugging method + * * @param fta * @param operator */ - public void prettyPrintTags(IFrameTupleAccessor fta, String operator) { + public static void prettyPrintTags(IFrameTupleAccessor fta, String operator) { try (ByteBufferInputStream bbis = new ByteBufferInputStream(); DataInputStream dis = new DataInputStream(bbis)) { int tc = fta.getTupleCount(); @@ -90,14 +93,15 @@ public class FrameDebugUtils { /** * Debugging method + * * @param fta * @param tid * @param bbis * @param dis * @param sb */ - protected void prettyPrintTag(IFrameTupleAccessor fta, int tid, ByteBufferInputStream bbis, DataInputStream dis, - StringBuilder sb) { + protected static void prettyPrintTag(IFrameTupleAccessor fta, int tid, ByteBufferInputStream bbis, + DataInputStream dis, StringBuilder sb) { sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")["); for (int j = 0; j < fta.getFieldCount(); ++j) { sb.append(" "); @@ -115,6 +119,7 @@ public class FrameDebugUtils { /** * Debugging method + * * @param fta * @param recordDescriptor * @param tid @@ -122,9 +127,8 @@ public class FrameDebugUtils { * @param dis * @param sb */ - protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid, - ByteBufferInputStream bbis, DataInputStream dis, - StringBuilder sb) { + protected static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid, + ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) { sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")["); for (int j = 0; j < fta.getFieldCount(); ++j) { sb.append(" "); @@ -133,8 +137,8 @@ public class FrameDebugUtils { } sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") "); sb.append("{"); - bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + fta - .getFieldStartOffset(tid, j)); + bbis.setByteBuffer(fta.getBuffer(), + fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + fta.getFieldStartOffset(tid, j)); try { sb.append(recordDescriptor.getFields()[j].deserialize(dis)); } catch (Exception e) { @@ -146,14 +150,14 @@ public class FrameDebugUtils { sb.append("\n"); } - /** * Debugging method + * * @param fta * @param recordDescriptor * @param tid */ - public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid) { + public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid) { try (ByteBufferInputStream bbis = new ByteBufferInputStream(); DataInputStream dis = new DataInputStream(bbis)) { StringBuilder sb = new StringBuilder(); @@ -169,13 +173,14 @@ public class FrameDebugUtils { * They are safe as they don't print records. Printing records * using IserializerDeserializer can print incorrect results or throw exceptions. * A better way yet would be to use record pointable. + * * @param fta * @param recordDescriptor * @param prefix * @param recordFields * @throws IOException */ - public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix, + public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix, int[] recordFields) throws IOException { try (ByteBufferInputStream bbis = new ByteBufferInputStream(); DataInputStream dis = new DataInputStream(bbis)) { @@ -191,14 +196,15 @@ public class FrameDebugUtils { /** * Debugging method + * * @param fta * @param recordDescriptor * @param tIdx * @param recordFields * @throws IOException */ - public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tIdx, int[] recordFields) - throws IOException { + public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tIdx, + int[] recordFields) throws IOException { try (ByteBufferInputStream bbis = new ByteBufferInputStream(); DataInputStream dis = new DataInputStream(bbis)) { StringBuilder sb = new StringBuilder(); @@ -209,14 +215,14 @@ public class FrameDebugUtils { /** * Debugging method + * * @param tuple * @param fieldsIdx * @param descIdx * @throws HyracksDataException */ - public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple, - int fieldsIdx, int descIdx) - throws HyracksDataException { + public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple, + int fieldsIdx, int descIdx) throws HyracksDataException { try (ByteBufferInputStream bbis = new ByteBufferInputStream(); DataInputStream dis = new DataInputStream(bbis)) { StringBuilder sb = new StringBuilder(); @@ -237,11 +243,12 @@ public class FrameDebugUtils { /** * Debugging method + * * @param tuple * @param descF * @throws HyracksDataException */ - public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple, + public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple, int[] descF) throws HyracksDataException { try (ByteBufferInputStream bbis = new ByteBufferInputStream(); DataInputStream dis = new DataInputStream(bbis)) { @@ -265,6 +272,7 @@ public class FrameDebugUtils { /** * Debugging method + * * @param fta * @param recordDescriptor * @param tid @@ -274,17 +282,15 @@ public class FrameDebugUtils { * @param recordFields * @throws IOException */ - protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid, - ByteBufferInputStream bbis, DataInputStream dis, - StringBuilder sb, - int[] recordFields) throws IOException { + protected static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid, + ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb, int[] recordFields) throws IOException { Arrays.sort(recordFields); sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")["); for (int j = 0; j < fta.getFieldCount(); ++j) { sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") "); sb.append("{"); - bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta - .getFieldSlotsLength() + fta.getFieldStartOffset(tid, j)); + bbis.setByteBuffer(fta.getBuffer(), + fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + fta.getFieldStartOffset(tid, j)); if (Arrays.binarySearch(recordFields, j) >= 0) { sb.append("{a record field: only print using pointable:"); sb.append("tag->" + dis.readByte() + "}"); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java index 0cc0170..d7d5c27 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java @@ -123,14 +123,15 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor, IPar try { wait(); } catch (InterruptedException e) { - throw new HyracksDataException(e); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } } } public synchronized void close() throws HyracksDataException { - for (int i = closedSenders.nextClearBit(0); i >= 0 - && i < nSenderPartitions; i = closedSenders.nextClearBit(i + 1)) { + for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = + closedSenders.nextClearBit(i + 1)) { if (channels[i] != null) { channels[i].close(); channels[i] = null;
