Repository: asterixdb Updated Branches: refs/heads/master 63c5e2f72 -> 9c02f5e20
[ASTERIXDB-2281][RT] Consider reserved txn ids when determining max Change-Id: I88f14fb351976db239ed752693e59882da62d588 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2368 Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Michael Blow <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9c02f5e2 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9c02f5e2 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9c02f5e2 Branch: refs/heads/master Commit: 9c02f5e205c6b899ec0de16c99b9bc40f9d33346 Parents: 63c5e2f Author: Michael Blow <[email protected]> Authored: Wed Feb 7 23:28:13 2018 -0500 Committer: Michael Blow <[email protected]> Committed: Wed Feb 7 21:53:51 2018 -0800 ---------------------------------------------------------------------- .../asterix/app/nc/NCAppRuntimeContext.java | 10 ++ .../hyracks/bootstrap/CCApplication.java | 2 +- .../asterix/messaging/NCMessageBroker.java | 10 +- .../common/api/INcApplicationContext.java | 2 + .../common/transactions/ITxnIdFactory.java | 9 ++ .../asterix/metadata/BulkTxnIdFactory.java | 47 --------- .../asterix/metadata/CachingTxnIdFactory.java | 102 +++++++++++++++++++ .../asterix/metadata/MetadataManager.java | 68 ++++++------- .../apache/asterix/metadata/MetadataNode.java | 19 ++-- .../asterix/metadata/api/IMetadataManager.java | 2 + .../asterix/metadata/api/IMetadataNode.java | 25 +---- .../message/ReportLocalCountersMessage.java | 2 +- .../message/ResourceIdRequestMessage.java | 14 +-- .../runtime/message/TxnIdBlockRequest.java | 99 ++++++++++++++++++ .../runtime/message/TxnIdBlockResponse.java | 52 ++++++++++ .../asterix/runtime/utils/BulkTxnIdFactory.java | 53 ++++++++++ .../runtime/utils/CcApplicationContext.java | 7 +- .../asterix/runtime/utils/CcTxnIdFactory.java | 86 ---------------- .../api/exceptions/HyracksDataException.java | 17 +--- .../control/common/base/IClusterController.java | 1 - .../control/common/ipc/CCNCFunctions.java | 1 + .../ipc/ClusterControllerRemoteProxy.java | 1 - 22 files changed, 392 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index d42db39..6e25856 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -513,4 +513,14 @@ public class NCAppRuntimeContext implements INcApplicationContext { public ICoordinationService getCoordinationService() { return NoOpCoordinationService.INSTANCE; } + + @Override + public long getMaxTxnId() { + if (txnSubsystem == null) { + throw new IllegalStateException("cannot determine max txn id before txnSubsystem is initialized!"); + } + + return Math.max(MetadataManager.INSTANCE == null ? 0 : MetadataManager.INSTANCE.getMaxTxnId(), + txnSubsystem.getTransactionManager().getMaxTxnId()); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 551e6aa..699892e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -178,7 +178,7 @@ public class CCApplication extends BaseCCApplication { throws AlgebricksException, IOException { return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE, globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, - new MetadataLockManager(), MetadataManager::getTxnIdBlockFactory); + new MetadataLockManager()); } protected GlobalRecoveryManager createGlobalRecoveryManager() throws Exception { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java index 7a74940..e4fe4f3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java @@ -95,7 +95,13 @@ public class NCMessageBroker implements INCMessageBroker { if (LOGGER.isInfoEnabled()) { LOGGER.info("Received message: " + absMessage); } - absMessage.handle(appContext); + ncs.getExecutor().submit(() -> { + try { + absMessage.handle(appContext); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Could not process message: {}", message, e); + } + }); } public ConcurrentFramePool getMessagingFramePool() { @@ -105,7 +111,7 @@ public class NCMessageBroker implements INCMessageBroker { private void sendMessageToChannel(IChannelControlBlock ccb, INcAddressedMessage msg) throws IOException { byte[] serializedMsg = JavaSerializationUtils.serialize(msg); if (serializedMsg.length > maxMsgSize) { - throw new HyracksDataException("Message exceded maximum size"); + throw new HyracksDataException("Message exceeded maximum size"); } // Prepare the message buffer ByteBuffer msgBuffer = messagingFramePool.get(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index 82936b3..19b4d61 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -124,4 +124,6 @@ public interface INcApplicationContext extends IApplicationContext { IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider(); IReplicaManager getReplicaManager(); + + long getMaxTxnId(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java index 3c60432..be4a1f8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java @@ -28,6 +28,8 @@ public interface ITxnIdFactory { */ TxnId create() throws AlgebricksException; + long getIdBlock(int blockSize); + /** * Ensure that future transaction ids are larger than the supplied id * @@ -35,4 +37,11 @@ public interface ITxnIdFactory { * the value to ensure future created transaction ids are larger than */ void ensureMinimumId(long id) throws AlgebricksException; + + /** + * The highest transaction id this factory has created + * + * @return the max transaction id + */ + long getMaxTxnId(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java deleted file mode 100644 index 8ac6b63..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.metadata; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.asterix.common.transactions.ITxnIdFactory; -import org.apache.asterix.common.transactions.TxnId; - -class BulkTxnIdFactory implements ITxnIdFactory { - - private final AtomicLong maxId = new AtomicLong(); - - @Override - public TxnId create() { - return new TxnId(maxId.incrementAndGet()); - } - - public long reserveIdBlock(int blockSize) { - if (blockSize < 1) { - throw new IllegalArgumentException("block size cannot be smaller than 1, but was " + blockSize); - } - return maxId.getAndAdd(blockSize) + 1; - } - - @Override - public void ensureMinimumId(long id) { - this.maxId.getAndUpdate(next -> Math.max(next, id)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java new file mode 100644 index 0000000..d44bb13 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.transactions.ITxnIdFactory; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.asterix.runtime.message.TxnIdBlockRequest; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Represents a factory to generate unique transaction IDs. + */ +class CachingTxnIdFactory implements ITxnIdFactory { + private static final Logger LOGGER = LogManager.getLogger(); + + private final INcApplicationContext appCtx; + private volatile Block block = new Block(0, 0); + + public CachingTxnIdFactory(INcApplicationContext appCtx) { + this.appCtx = appCtx; + } + + @Override + public TxnId create() throws AlgebricksException { + while (true) { + try { + return new TxnId(block.nextId()); + } catch (BlockExhaustedException ex) { + // retry + LOGGER.info("block exhausted; obtaining new block from supplier"); + TxnIdBlockRequest.Block newBlock; + try { + newBlock = TxnIdBlockRequest.send(appCtx); + } catch (HyracksDataException e) { + throw new AlgebricksException(e); + } + block = new Block(newBlock.getStartingId(), newBlock.getBlockSize()); + } + } + } + + @Override + public void ensureMinimumId(long id) throws AlgebricksException { + throw new UnsupportedOperationException(); + } + + @Override + public long getIdBlock(int blockSize) { + throw new UnsupportedOperationException(); + } + + @Override + public long getMaxTxnId() { + return block.endExclusive - 1; + } + + static class Block { + private static final BlockExhaustedException BLOCK_EXHAUSTED_EXCEPTION = new BlockExhaustedException(); + private final AtomicLong id; + private final long start; + private final long endExclusive; + + private Block(long start, long blockSize) { + this.id = new AtomicLong(start); + this.start = start; + this.endExclusive = start + blockSize; + } + + private long nextId() throws BlockExhaustedException { + long nextId = id.incrementAndGet(); + if (nextId >= endExclusive && (endExclusive >= start || nextId < start)) { + throw BLOCK_EXHAUSTED_EXCEPTION; + } + return nextId; + } + } + + private static class BlockExhaustedException extends Exception { + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index b4b304e..b2757f2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -35,7 +35,7 @@ import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.common.transactions.ILongBlockFactory; +import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.api.IAsterixStateProxy; @@ -92,7 +92,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; * cluster, i.e., metadata transaction ids shall never "accidentally" overlap * with transaction ids of regular jobs or other metadata transactions. */ -public abstract class MetadataManager implements IMetadataManager, ILongBlockFactory { +public abstract class MetadataManager implements IMetadataManager { private final MetadataCache cache = new MetadataCache(); protected final Collection<IAsterixStateProxy> proxies; protected IMetadataNode metadataNode; @@ -119,13 +119,19 @@ public abstract class MetadataManager implements IMetadataManager, ILongBlockFac this.metadataLatch = new ReentrantReadWriteLock(true); } + protected abstract TxnId createTxnId(); + @Override public void init() throws HyracksDataException { // no op } @Override - public abstract MetadataTransactionContext beginTransaction() throws RemoteException, ACIDException; + public MetadataTransactionContext beginTransaction() throws RemoteException { + TxnId txnId = createTxnId(); + metadataNode.beginTransaction(txnId); + return new MetadataTransactionContext(txnId); + } @Override public void commitTransaction(MetadataTransactionContext ctx) throws RemoteException, ACIDException { @@ -997,34 +1003,6 @@ public abstract class MetadataManager implements IMetadataManager, ILongBlockFac rebindMetadataNode = true; } - @Override - public void ensureMinimum(long value) throws AlgebricksException { - try { - metadataNode.ensureMinimumTxnId(value); - } catch (RemoteException e) { - throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); - } - } - - @Override - public long getBlock(int blockSize) throws AlgebricksException { - try { - return metadataNode.reserveTxnIdBlock(blockSize); - } catch (RemoteException e) { - throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); - } - } - - public static ILongBlockFactory getTxnIdBlockFactory() { - try { - INSTANCE.init(); - } catch (HyracksDataException e) { - throw new IllegalStateException(e); - } - return (ILongBlockFactory) INSTANCE; - - } - public static void initialize(IAsterixStateProxy proxy, MetadataProperties metadataProperties, ICcApplicationContext appCtx) { INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties, appCtx); @@ -1046,15 +1024,19 @@ public abstract class MetadataManager implements IMetadataManager, ILongBlockFac } @Override - public MetadataTransactionContext beginTransaction() throws RemoteException { + protected TxnId createTxnId() { TxnId txnId; try { txnId = appCtx.getTxnIdFactory().create(); } catch (AlgebricksException e) { throw new ACIDException(e); } - metadataNode.beginTransaction(txnId); - return new MetadataTransactionContext(txnId); + return txnId; + } + + @Override + public long getMaxTxnId() { + return appCtx.getTxnIdFactory().getMaxTxnId(); } @Override @@ -1083,15 +1065,25 @@ public abstract class MetadataManager implements IMetadataManager, ILongBlockFac } private static class NCMetadataManagerImpl extends MetadataManager { + private final ITxnIdFactory txnIdFactory; + NCMetadataManagerImpl(Collection<IAsterixStateProxy> proxies, MetadataNode metadataNode) { super(proxies, metadataNode); + txnIdFactory = metadataNode.getTxnIdFactory(); + } + + @Override + protected TxnId createTxnId() { + try { + return txnIdFactory.create(); + } catch (AlgebricksException e) { + throw new ACIDException(e); + } } @Override - public MetadataTransactionContext beginTransaction() throws RemoteException { - TxnId txnId = new TxnId(metadataNode.reserveTxnIdBlock(1)); - metadataNode.beginTransaction(txnId); - return new MetadataTransactionContext(txnId); + public long getMaxTxnId() { + return txnIdFactory.getMaxTxnId(); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index fd21941..681bae7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -39,6 +39,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel; import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.transactions.ImmutableDatasetId; import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.common.transactions.TxnId; @@ -132,7 +133,7 @@ public class MetadataNode implements IMetadataNode { private IDatasetLifecycleManager datasetLifecycleManager; private ITransactionSubsystem transactionSubsystem; private int metadataStoragePartition; - private transient BulkTxnIdFactory txnIdFactory; + private transient CachingTxnIdFactory txnIdFactory; // core only private transient MetadataTupleTranslatorProvider tupleTranslatorProvider; // extension only @@ -159,7 +160,7 @@ public class MetadataNode implements IMetadataNode { } } } - this.txnIdFactory = new BulkTxnIdFactory(); + this.txnIdFactory = new CachingTxnIdFactory(runtimeContext); } public int getMetadataStoragePartition() { @@ -167,16 +168,6 @@ public class MetadataNode implements IMetadataNode { } @Override - public void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException { - txnIdFactory.ensureMinimumId(maxId); - } - - @Override - public long reserveTxnIdBlock(int blockSize) throws ACIDException, RemoteException { - return txnIdFactory.reserveIdBlock(blockSize); - } - - @Override public void beginTransaction(TxnId transactionId) throws ACIDException, RemoteException { TransactionOptions options = new TransactionOptions(AtomicityLevel.ATOMIC); transactionSubsystem.getTransactionManager().beginTransaction(transactionId, options); @@ -2009,4 +2000,8 @@ public class MetadataNode implements IMetadataNode { throw new AlgebricksException(e); } } + + public ITxnIdFactory getTxnIdFactory() { + return txnIdFactory; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java index b2d0d3e..e030db3 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java @@ -724,4 +724,6 @@ public interface IMetadataManager extends IMetadataBootstrap { List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String dataverseName, String feedName) throws AlgebricksException; + + long getMaxTxnId(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java index c3f9d7f..f6abc53 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java @@ -26,7 +26,6 @@ import java.util.List; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.common.transactions.ITxnIdBlockProvider; import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.entities.CompactionPolicy; @@ -52,28 +51,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; * lock/access metadata shall always go through the MetadataManager, and should * never call methods on the MetadataNode directly for any reason. */ -public interface IMetadataNode extends Remote, Serializable, ITxnIdBlockProvider { - - /** - * Allocates a block of transaction ids of specified block size - * - * @param maxId - * The txn id to ensure future txn ids are larger than - * @throws ACIDException - * @throws RemoteException - */ - void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException; - - /** - * Allocates a block of transaction ids of specified block size - * - * @param blockSize - * The size of the transaction id block to reserve - * @return the start of the reserved block - * @throws ACIDException - * @throws RemoteException - */ - long reserveTxnIdBlock(int blockSize) throws ACIDException, RemoteException; +public interface IMetadataNode extends Remote, Serializable { /** * Begins a local transaction against the metadata. @@ -828,5 +806,4 @@ public interface IMetadataNode extends Remote, Serializable, ITxnIdBlockProvider List<FeedConnection> getFeedConnections(TxnId txnId, String dataverseName, String feedName) throws AlgebricksException, RemoteException; - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java index db2a044..fe9a5b8 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java @@ -65,7 +65,7 @@ public class ReportLocalCountersMessage implements ICcAddressedMessage { INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext(); long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); - long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId(); + long maxTxnId = appContext.getMaxTxnId(); long maxJobId = ncs.getMaxJobId(ccId); ReportLocalCountersMessage countersMessage = new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId, maxJobId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index a2f4aa1..3e172c7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -39,20 +39,20 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { try { ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage(); + ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage(); IClusterStateManager clusterStateManager = appCtx.getClusterStateManager(); if (!clusterStateManager.isClusterActive()) { - reponse.setResourceId(-1); - reponse.setException(new Exception("Cannot generate global resource id when cluster is not active.")); + response.setResourceId(-1); + response.setException(new Exception("Cannot generate global resource id when cluster is not active.")); } else { IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); - reponse.setResourceId(resourceIdManager.createResourceId()); - if (reponse.getResourceId() < 0) { - reponse.setException(new Exception("One or more nodes has not reported max resource id.")); + response.setResourceId(resourceIdManager.createResourceId()); + if (response.getResourceId() < 0) { + response.setException(new Exception("One or more nodes has not reported max resource id.")); } requestMaxResourceID(clusterStateManager, resourceIdManager, broker); } - broker.sendApplicationMessageToNC(reponse, src); + broker.sendApplicationMessageToNC(response, src); } catch (Exception e) { throw HyracksDataException.create(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java new file mode 100644 index 0000000..b8578ec --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class TxnIdBlockRequest implements ICcAddressedMessage { + private static final Logger LOGGER = LogManager.getLogger(); + private static final int BLOCK_SIZE = 100; + private static final long serialVersionUID = 1L; + + private static BlockingQueue<TxnIdBlockResponse> blockQueue = new LinkedBlockingQueue<>(); + private final String nodeId; + private final int blockSizeRequested; + + public TxnIdBlockRequest(String nodeId, int blockSizeRequested) { + this.nodeId = nodeId; + this.blockSizeRequested = blockSizeRequested; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException { + try { + ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + long startingId = appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested); + TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, blockSizeRequested); + broker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + public static Block send(INcApplicationContext ncs) throws HyracksDataException { + TxnIdBlockRequest blockRequestMessage = new TxnIdBlockRequest(ncs.getServiceContext().getNodeId(), BLOCK_SIZE); + try { + ((INCMessageBroker) ncs.getServiceContext().getMessageBroker()).sendMessageToPrimaryCC(blockRequestMessage); + TxnIdBlockResponse response = blockQueue.take(); + return new Block(response.getStartingId(), response.getBlockSize()); + } catch (Exception e) { + LOGGER.log(Level.ERROR, "Unable to request transaction id block", e); + throw HyracksDataException.create(e); + } + } + + static void addResponse(TxnIdBlockResponse response) { + blockQueue.offer(response); + } + + @Override + public String toString() { + return TxnIdBlockRequest.class.getSimpleName(); + } + + public static class Block { + + private final long startingId; + private final int blockSize; + + public Block(long startingId, int blockSize) { + this.startingId = startingId; + this.blockSize = blockSize; + } + + public long getStartingId() { + return startingId; + } + + public int getBlockSize() { + return blockSize; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java new file mode 100644 index 0000000..46d742a --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class TxnIdBlockResponse implements INcAddressedMessage { + private static final long serialVersionUID = 1L; + private final long startingId; + private final int blockSize; + + public TxnIdBlockResponse(long startingId, int blockSize) { + this.startingId = startingId; + this.blockSize = blockSize; + } + + public long getStartingId() { + return startingId; + } + + public int getBlockSize() { + return blockSize; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + TxnIdBlockRequest.addResponse(this); + } + + @Override + public String toString() { + return TxnIdBlockResponse.class.getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java new file mode 100644 index 0000000..542bc17 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.utils; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.asterix.common.transactions.ITxnIdFactory; +import org.apache.asterix.common.transactions.TxnId; + +class BulkTxnIdFactory implements ITxnIdFactory { + + private final AtomicLong maxId = new AtomicLong(); + + @Override + public TxnId create() { + return new TxnId(maxId.incrementAndGet()); + } + + @Override + public long getIdBlock(int blockSize) { + if (blockSize < 1) { + throw new IllegalArgumentException("block size cannot be smaller than 1, but was " + blockSize); + } + return maxId.getAndAdd(blockSize) + 1; + } + + @Override + public void ensureMinimumId(long id) { + this.maxId.getAndUpdate(next -> Math.max(next, id)); + } + + @Override + public long getMaxTxnId() { + return maxId.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index b83df6c..4157e16 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -24,7 +24,6 @@ import java.util.function.Supplier; import org.apache.asterix.common.api.ICoordinationService; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.api.INodeJobTracker; -import org.apache.asterix.common.transactions.ILongBlockFactory; import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; @@ -93,8 +92,7 @@ public class CcApplicationContext implements ICcApplicationContext { ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider, - IMetadataLockManager mdLockManager, Supplier<ILongBlockFactory> txnIdBlockSupplier) - throws AlgebricksException, IOException { + IMetadataLockManager mdLockManager) throws AlgebricksException, IOException { this.ccServiceCtx = ccServiceCtx; this.hcc = hcc; this.libraryManager = libraryManager; @@ -122,7 +120,8 @@ public class CcApplicationContext implements ICcApplicationContext { clusterStateManager.setCcAppCtx(this); this.resourceIdManager = new ResourceIdManager(clusterStateManager); nodeJobTracker = new NodeJobTracker(); - txnIdFactory = new CcTxnIdFactory(txnIdBlockSupplier); + txnIdFactory = new BulkTxnIdFactory(); + } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java deleted file mode 100644 index 82bbe6b..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.utils; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; - -import org.apache.asterix.common.transactions.ILongBlockFactory; -import org.apache.asterix.common.transactions.ITxnIdFactory; -import org.apache.asterix.common.transactions.TxnId; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * Represents a factory to generate unique transaction IDs. - */ -class CcTxnIdFactory implements ITxnIdFactory { - private static final int TXN_BLOCK_SIZE = 1024; - private static final Logger LOGGER = LogManager.getLogger(); - - private final Supplier<ILongBlockFactory> blockFactorySupplier; - private volatile Block block = new Block(0, 0); - - public CcTxnIdFactory(Supplier<ILongBlockFactory> blockFactorySupplier) { - this.blockFactorySupplier = blockFactorySupplier; - } - - @Override - public TxnId create() throws AlgebricksException { - while (true) { - try { - return new TxnId(block.nextId()); - } catch (BlockExhaustedException ex) { - // retry - LOGGER.info("block exhausted; obtaining new block from supplier"); - block = new Block(blockFactorySupplier.get().getBlock(TXN_BLOCK_SIZE), TXN_BLOCK_SIZE); - } - } - } - - @Override - public void ensureMinimumId(long id) throws AlgebricksException { - blockFactorySupplier.get().ensureMinimum(id); - } - - static class Block { - private static final BlockExhaustedException BLOCK_EXHAUSTED_EXCEPTION = new BlockExhaustedException(); - private final AtomicLong id; - private final long start; - private final long endExclusive; - - private Block(long start, long blockSize) { - this.id = new AtomicLong(start); - this.start = start; - this.endExclusive = start + blockSize; - } - - private long nextId() throws BlockExhaustedException { - long nextId = id.incrementAndGet(); - if (nextId >= endExclusive && (endExclusive >= start || nextId < start)) { - throw BLOCK_EXHAUSTED_EXCEPTION; - } - return nextId; - } - } - - private static class BlockExhaustedException extends Exception { - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java index 58b4b27..d0e4655 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java @@ -22,9 +22,6 @@ package org.apache.hyracks.api.exceptions; import java.io.Serializable; import org.apache.hyracks.api.util.ErrorMessageUtil; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; /** * The main execution time exception type for runtime errors in a hyracks environment @@ -32,7 +29,6 @@ import org.apache.logging.log4j.Logger; public class HyracksDataException extends HyracksException { private static final long serialVersionUID = 1L; - private static final Logger LOGGER = LogManager.getLogger(); public static HyracksDataException create(Throwable cause) { if (cause instanceof HyracksDataException || cause == null) { @@ -40,11 +36,8 @@ public class HyracksDataException extends HyracksException { } else if (cause instanceof Error) { // don't wrap errors, allow them to propagate throw (Error) cause; - } else if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) { - // TODO(mblow): why not force interrupt on current thread? - LOGGER.log(Level.WARN, - "Wrapping an InterruptedException in HyracksDataException and current thread is not interrupted", - cause); + } else if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); } return new HyracksDataException(cause); } @@ -65,10 +58,8 @@ public class HyracksDataException extends HyracksException { // don't suppress errors into a HyracksDataException, allow them to propagate th.addSuppressed(root); throw (Error) th; - } else if (th instanceof InterruptedException && !Thread.currentThread().isInterrupted()) { - // TODO(mblow): why not force interrupt on current thread? - LOGGER.log(Level.WARN, "Suppressing an InterruptedException in a HyracksDataException and current " - + "thread is not interrupted", th); + } else if (th instanceof InterruptedException) { + Thread.currentThread().interrupt(); } root.addSuppressed(th); return root; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index a95ae3d..5e3c3d4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -21,7 +21,6 @@ package org.apache.hyracks.control.common.base; import java.util.List; import org.apache.hyracks.api.comm.NetworkAddress; -import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.deployment.DeploymentId; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 5c6d078..3d505f3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -1314,6 +1314,7 @@ public class CCNCFunctions { } public static class ShutdownResponseFunction extends Function { + private static final long serialVersionUID = 1L; private final String nodeId; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index ae40ea3..027316e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -21,7 +21,6 @@ package org.apache.hyracks.control.common.ipc; import java.util.List; import org.apache.hyracks.api.comm.NetworkAddress; -import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.deployment.DeploymentId;
