Repository: asterixdb
Updated Branches:
  refs/heads/master 63c5e2f72 -> 9c02f5e20


[ASTERIXDB-2281][RT] Consider reserved txn ids when determining max

Change-Id: I88f14fb351976db239ed752693e59882da62d588
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2368
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Michael Blow <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9c02f5e2
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9c02f5e2
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9c02f5e2

Branch: refs/heads/master
Commit: 9c02f5e205c6b899ec0de16c99b9bc40f9d33346
Parents: 63c5e2f
Author: Michael Blow <[email protected]>
Authored: Wed Feb 7 23:28:13 2018 -0500
Committer: Michael Blow <[email protected]>
Committed: Wed Feb 7 21:53:51 2018 -0800

----------------------------------------------------------------------
 .../asterix/app/nc/NCAppRuntimeContext.java     |  10 ++
 .../hyracks/bootstrap/CCApplication.java        |   2 +-
 .../asterix/messaging/NCMessageBroker.java      |  10 +-
 .../common/api/INcApplicationContext.java       |   2 +
 .../common/transactions/ITxnIdFactory.java      |   9 ++
 .../asterix/metadata/BulkTxnIdFactory.java      |  47 ---------
 .../asterix/metadata/CachingTxnIdFactory.java   | 102 +++++++++++++++++++
 .../asterix/metadata/MetadataManager.java       |  68 ++++++-------
 .../apache/asterix/metadata/MetadataNode.java   |  19 ++--
 .../asterix/metadata/api/IMetadataManager.java  |   2 +
 .../asterix/metadata/api/IMetadataNode.java     |  25 +----
 .../message/ReportLocalCountersMessage.java     |   2 +-
 .../message/ResourceIdRequestMessage.java       |  14 +--
 .../runtime/message/TxnIdBlockRequest.java      |  99 ++++++++++++++++++
 .../runtime/message/TxnIdBlockResponse.java     |  52 ++++++++++
 .../asterix/runtime/utils/BulkTxnIdFactory.java |  53 ++++++++++
 .../runtime/utils/CcApplicationContext.java     |   7 +-
 .../asterix/runtime/utils/CcTxnIdFactory.java   |  86 ----------------
 .../api/exceptions/HyracksDataException.java    |  17 +---
 .../control/common/base/IClusterController.java |   1 -
 .../control/common/ipc/CCNCFunctions.java       |   1 +
 .../ipc/ClusterControllerRemoteProxy.java       |   1 -
 22 files changed, 392 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index d42db39..6e25856 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -513,4 +513,14 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     public ICoordinationService getCoordinationService() {
         return NoOpCoordinationService.INSTANCE;
     }
+
+    @Override
+    public long getMaxTxnId() {
+        if (txnSubsystem == null) {
+            throw new IllegalStateException("cannot determine max txn id 
before txnSubsystem is initialized!");
+        }
+
+        return Math.max(MetadataManager.INSTANCE == null ? 0 : 
MetadataManager.INSTANCE.getMaxTxnId(),
+                txnSubsystem.getTransactionManager().getMaxTxnId());
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 551e6aa..699892e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -178,7 +178,7 @@ public class CCApplication extends BaseCCApplication {
             throws AlgebricksException, IOException {
         return new CcApplicationContext(ccServiceCtx, getHcc(), 
libraryManager, () -> MetadataManager.INSTANCE,
                 globalRecoveryManager, lifecycleCoordinator, new 
ActiveNotificationHandler(), componentProvider,
-                new MetadataLockManager(), 
MetadataManager::getTxnIdBlockFactory);
+                new MetadataLockManager());
     }
 
     protected GlobalRecoveryManager createGlobalRecoveryManager() throws 
Exception {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 7a74940..e4fe4f3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -95,7 +95,13 @@ public class NCMessageBroker implements INCMessageBroker {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Received message: " + absMessage);
         }
-        absMessage.handle(appContext);
+        ncs.getExecutor().submit(() -> {
+            try {
+                absMessage.handle(appContext);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARN, "Could not process message: {}", 
message, e);
+            }
+        });
     }
 
     public ConcurrentFramePool getMessagingFramePool() {
@@ -105,7 +111,7 @@ public class NCMessageBroker implements INCMessageBroker {
     private void sendMessageToChannel(IChannelControlBlock ccb, 
INcAddressedMessage msg) throws IOException {
         byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
         if (serializedMsg.length > maxMsgSize) {
-            throw new HyracksDataException("Message exceded maximum size");
+            throw new HyracksDataException("Message exceeded maximum size");
         }
         // Prepare the message buffer
         ByteBuffer msgBuffer = messagingFramePool.get();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 82936b3..19b4d61 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -124,4 +124,6 @@ public interface INcApplicationContext extends 
IApplicationContext {
     IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
 
     IReplicaManager getReplicaManager();
+
+    long getMaxTxnId();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
index 3c60432..be4a1f8 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
@@ -28,6 +28,8 @@ public interface ITxnIdFactory {
      */
     TxnId create() throws AlgebricksException;
 
+    long getIdBlock(int blockSize);
+
     /**
      * Ensure that future transaction ids are larger than the supplied id
      *
@@ -35,4 +37,11 @@ public interface ITxnIdFactory {
      *            the value to ensure future created transaction ids are 
larger than
      */
     void ensureMinimumId(long id) throws AlgebricksException;
+
+    /**
+     * The highest transaction id this factory has created
+     *
+     * @return the max transaction id
+     */
+    long getMaxTxnId();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
deleted file mode 100644
index 8ac6b63..0000000
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.asterix.common.transactions.ITxnIdFactory;
-import org.apache.asterix.common.transactions.TxnId;
-
-class BulkTxnIdFactory implements ITxnIdFactory {
-
-    private final AtomicLong maxId = new AtomicLong();
-
-    @Override
-    public TxnId create() {
-        return new TxnId(maxId.incrementAndGet());
-    }
-
-    public long reserveIdBlock(int blockSize) {
-        if (blockSize < 1) {
-            throw new IllegalArgumentException("block size cannot be smaller 
than 1, but was " + blockSize);
-        }
-        return maxId.getAndAdd(blockSize) + 1;
-    }
-
-    @Override
-    public void ensureMinimumId(long id) {
-        this.maxId.getAndUpdate(next -> Math.max(next, id));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
new file mode 100644
index 0000000..d44bb13
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.runtime.message.TxnIdBlockRequest;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Represents a factory to generate unique transaction IDs.
+ */
+class CachingTxnIdFactory implements ITxnIdFactory {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final INcApplicationContext appCtx;
+    private volatile Block block = new Block(0, 0);
+
+    public CachingTxnIdFactory(INcApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+
+    @Override
+    public TxnId create() throws AlgebricksException {
+        while (true) {
+            try {
+                return new TxnId(block.nextId());
+            } catch (BlockExhaustedException ex) {
+                // retry
+                LOGGER.info("block exhausted; obtaining new block from 
supplier");
+                TxnIdBlockRequest.Block newBlock;
+                try {
+                    newBlock = TxnIdBlockRequest.send(appCtx);
+                } catch (HyracksDataException e) {
+                    throw new AlgebricksException(e);
+                }
+                block = new Block(newBlock.getStartingId(), 
newBlock.getBlockSize());
+            }
+        }
+    }
+
+    @Override
+    public void ensureMinimumId(long id) throws AlgebricksException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getIdBlock(int blockSize) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxTxnId() {
+        return block.endExclusive - 1;
+    }
+
+    static class Block {
+        private static final BlockExhaustedException BLOCK_EXHAUSTED_EXCEPTION 
= new BlockExhaustedException();
+        private final AtomicLong id;
+        private final long start;
+        private final long endExclusive;
+
+        private Block(long start, long blockSize) {
+            this.id = new AtomicLong(start);
+            this.start = start;
+            this.endExclusive = start + blockSize;
+        }
+
+        private long nextId() throws BlockExhaustedException {
+            long nextId = id.incrementAndGet();
+            if (nextId >= endExclusive && (endExclusive >= start || nextId < 
start)) {
+                throw BLOCK_EXHAUSTED_EXCEPTION;
+            }
+            return nextId;
+        }
+    }
+
+    private static class BlockExhaustedException extends Exception {
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index b4b304e..b2757f2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -35,7 +35,7 @@ import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -92,7 +92,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
  * cluster, i.e., metadata transaction ids shall never "accidentally" overlap
  * with transaction ids of regular jobs or other metadata transactions.
  */
-public abstract class MetadataManager implements IMetadataManager, 
ILongBlockFactory {
+public abstract class MetadataManager implements IMetadataManager {
     private final MetadataCache cache = new MetadataCache();
     protected final Collection<IAsterixStateProxy> proxies;
     protected IMetadataNode metadataNode;
@@ -119,13 +119,19 @@ public abstract class MetadataManager implements 
IMetadataManager, ILongBlockFac
         this.metadataLatch = new ReentrantReadWriteLock(true);
     }
 
+    protected abstract TxnId createTxnId();
+
     @Override
     public void init() throws HyracksDataException {
         // no op
     }
 
     @Override
-    public abstract MetadataTransactionContext beginTransaction() throws 
RemoteException, ACIDException;
+    public MetadataTransactionContext beginTransaction() throws 
RemoteException {
+        TxnId txnId = createTxnId();
+        metadataNode.beginTransaction(txnId);
+        return new MetadataTransactionContext(txnId);
+    }
 
     @Override
     public void commitTransaction(MetadataTransactionContext ctx) throws 
RemoteException, ACIDException {
@@ -997,34 +1003,6 @@ public abstract class MetadataManager implements 
IMetadataManager, ILongBlockFac
         rebindMetadataNode = true;
     }
 
-    @Override
-    public void ensureMinimum(long value) throws AlgebricksException {
-        try {
-            metadataNode.ensureMinimumTxnId(value);
-        } catch (RemoteException e) {
-            throw new 
MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
-        }
-    }
-
-    @Override
-    public long getBlock(int blockSize) throws AlgebricksException {
-        try {
-            return metadataNode.reserveTxnIdBlock(blockSize);
-        } catch (RemoteException e) {
-            throw new 
MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
-        }
-    }
-
-    public static ILongBlockFactory getTxnIdBlockFactory() {
-        try {
-            INSTANCE.init();
-        } catch (HyracksDataException e) {
-            throw new IllegalStateException(e);
-        }
-        return (ILongBlockFactory) INSTANCE;
-
-    }
-
     public static void initialize(IAsterixStateProxy proxy, MetadataProperties 
metadataProperties,
             ICcApplicationContext appCtx) {
         INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties, 
appCtx);
@@ -1046,15 +1024,19 @@ public abstract class MetadataManager implements 
IMetadataManager, ILongBlockFac
         }
 
         @Override
-        public MetadataTransactionContext beginTransaction() throws 
RemoteException {
+        protected TxnId createTxnId() {
             TxnId txnId;
             try {
                 txnId = appCtx.getTxnIdFactory().create();
             } catch (AlgebricksException e) {
                 throw new ACIDException(e);
             }
-            metadataNode.beginTransaction(txnId);
-            return new MetadataTransactionContext(txnId);
+            return txnId;
+        }
+
+        @Override
+        public long getMaxTxnId() {
+            return appCtx.getTxnIdFactory().getMaxTxnId();
         }
 
         @Override
@@ -1083,15 +1065,25 @@ public abstract class MetadataManager implements 
IMetadataManager, ILongBlockFac
     }
 
     private static class NCMetadataManagerImpl extends MetadataManager {
+        private final ITxnIdFactory txnIdFactory;
+
         NCMetadataManagerImpl(Collection<IAsterixStateProxy> proxies, 
MetadataNode metadataNode) {
             super(proxies, metadataNode);
+            txnIdFactory = metadataNode.getTxnIdFactory();
+        }
+
+        @Override
+        protected TxnId createTxnId() {
+            try {
+                return txnIdFactory.create();
+            } catch (AlgebricksException e) {
+                throw new ACIDException(e);
+            }
         }
 
         @Override
-        public MetadataTransactionContext beginTransaction() throws 
RemoteException {
-            TxnId txnId = new TxnId(metadataNode.reserveTxnIdBlock(1));
-            metadataNode.beginTransaction(txnId);
-            return new MetadataTransactionContext(txnId);
+        public long getMaxTxnId() {
+            return txnIdFactory.getMaxTxnId();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index fd21941..681bae7 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -39,6 +39,7 @@ import 
org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
@@ -132,7 +133,7 @@ public class MetadataNode implements IMetadataNode {
     private IDatasetLifecycleManager datasetLifecycleManager;
     private ITransactionSubsystem transactionSubsystem;
     private int metadataStoragePartition;
-    private transient BulkTxnIdFactory txnIdFactory;
+    private transient CachingTxnIdFactory txnIdFactory;
     // core only
     private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
     // extension only
@@ -159,7 +160,7 @@ public class MetadataNode implements IMetadataNode {
                 }
             }
         }
-        this.txnIdFactory = new BulkTxnIdFactory();
+        this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
     }
 
     public int getMetadataStoragePartition() {
@@ -167,16 +168,6 @@ public class MetadataNode implements IMetadataNode {
     }
 
     @Override
-    public void ensureMinimumTxnId(long maxId) throws ACIDException, 
RemoteException {
-        txnIdFactory.ensureMinimumId(maxId);
-    }
-
-    @Override
-    public long reserveTxnIdBlock(int blockSize) throws ACIDException, 
RemoteException {
-        return txnIdFactory.reserveIdBlock(blockSize);
-    }
-
-    @Override
     public void beginTransaction(TxnId transactionId) throws ACIDException, 
RemoteException {
         TransactionOptions options = new 
TransactionOptions(AtomicityLevel.ATOMIC);
         
transactionSubsystem.getTransactionManager().beginTransaction(transactionId, 
options);
@@ -2009,4 +2000,8 @@ public class MetadataNode implements IMetadataNode {
             throw new AlgebricksException(e);
         }
     }
+
+    public ITxnIdFactory getTxnIdFactory() {
+        return txnIdFactory;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index b2d0d3e..e030db3 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -724,4 +724,6 @@ public interface IMetadataManager extends 
IMetadataBootstrap {
 
     List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, 
String dataverseName, String feedName)
             throws AlgebricksException;
+
+    long getMaxTxnId();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index c3f9d7f..f6abc53 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -26,7 +26,6 @@ import java.util.List;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.transactions.ITxnIdBlockProvider;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -52,28 +51,7 @@ import 
org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
  * lock/access metadata shall always go through the MetadataManager, and should
  * never call methods on the MetadataNode directly for any reason.
  */
-public interface IMetadataNode extends Remote, Serializable, 
ITxnIdBlockProvider {
-
-    /**
-     * Allocates a block of transaction ids of specified block size
-     *
-     * @param maxId
-     *            The txn id to ensure future txn ids are larger than
-     * @throws ACIDException
-     * @throws RemoteException
-     */
-    void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException;
-
-    /**
-     * Allocates a block of transaction ids of specified block size
-     *
-     * @param blockSize
-     *            The size of the transaction id block to reserve
-     * @return the start of the reserved block
-     * @throws ACIDException
-     * @throws RemoteException
-     */
-    long reserveTxnIdBlock(int blockSize) throws ACIDException, 
RemoteException;
+public interface IMetadataNode extends Remote, Serializable {
 
     /**
      * Begins a local transaction against the metadata.
@@ -828,5 +806,4 @@ public interface IMetadataNode extends Remote, 
Serializable, ITxnIdBlockProvider
 
     List<FeedConnection> getFeedConnections(TxnId txnId, String dataverseName, 
String feedName)
             throws AlgebricksException, RemoteException;
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index db2a044..fe9a5b8 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -65,7 +65,7 @@ public class ReportLocalCountersMessage implements 
ICcAddressedMessage {
         INcApplicationContext appContext = (INcApplicationContext) 
ncs.getApplicationContext();
         long maxResourceId = 
Math.max(appContext.getLocalResourceRepository().maxId(),
                 
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
-        long maxTxnId = 
appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+        long maxTxnId = appContext.getMaxTxnId();
         long maxJobId = ncs.getMaxJobId(ccId);
         ReportLocalCountersMessage countersMessage =
                 new ReportLocalCountersMessage(ncs.getId(), maxResourceId, 
maxTxnId, maxJobId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index a2f4aa1..3e172c7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -39,20 +39,20 @@ public class ResourceIdRequestMessage implements 
ICcAddressedMessage {
     public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
         try {
             ICCMessageBroker broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
-            ResourceIdRequestResponseMessage reponse = new 
ResourceIdRequestResponseMessage();
+            ResourceIdRequestResponseMessage response = new 
ResourceIdRequestResponseMessage();
             IClusterStateManager clusterStateManager = 
appCtx.getClusterStateManager();
             if (!clusterStateManager.isClusterActive()) {
-                reponse.setResourceId(-1);
-                reponse.setException(new Exception("Cannot generate global 
resource id when cluster is not active."));
+                response.setResourceId(-1);
+                response.setException(new Exception("Cannot generate global 
resource id when cluster is not active."));
             } else {
                 IResourceIdManager resourceIdManager = 
appCtx.getResourceIdManager();
-                reponse.setResourceId(resourceIdManager.createResourceId());
-                if (reponse.getResourceId() < 0) {
-                    reponse.setException(new Exception("One or more nodes has 
not reported max resource id."));
+                response.setResourceId(resourceIdManager.createResourceId());
+                if (response.getResourceId() < 0) {
+                    response.setException(new Exception("One or more nodes has 
not reported max resource id."));
                 }
                 requestMaxResourceID(clusterStateManager, resourceIdManager, 
broker);
             }
-            broker.sendApplicationMessageToNC(reponse, src);
+            broker.sendApplicationMessageToNC(response, src);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
new file mode 100644
index 0000000..b8578ec
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class TxnIdBlockRequest implements ICcAddressedMessage {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int BLOCK_SIZE = 100;
+    private static final long serialVersionUID = 1L;
+
+    private static BlockingQueue<TxnIdBlockResponse> blockQueue = new 
LinkedBlockingQueue<>();
+    private final String nodeId;
+    private final int blockSizeRequested;
+
+    public TxnIdBlockRequest(String nodeId, int blockSizeRequested) {
+        this.nodeId = nodeId;
+        this.blockSizeRequested = blockSizeRequested;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException {
+        try {
+            ICCMessageBroker broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+            long startingId = 
appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
+            TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, 
blockSizeRequested);
+            broker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static Block send(INcApplicationContext ncs) throws 
HyracksDataException {
+        TxnIdBlockRequest blockRequestMessage = new 
TxnIdBlockRequest(ncs.getServiceContext().getNodeId(), BLOCK_SIZE);
+        try {
+            ((INCMessageBroker) 
ncs.getServiceContext().getMessageBroker()).sendMessageToPrimaryCC(blockRequestMessage);
+            TxnIdBlockResponse response = blockQueue.take();
+            return new Block(response.getStartingId(), 
response.getBlockSize());
+        } catch (Exception e) {
+            LOGGER.log(Level.ERROR, "Unable to request transaction id block", 
e);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    static void addResponse(TxnIdBlockResponse response) {
+        blockQueue.offer(response);
+    }
+
+    @Override
+    public String toString() {
+        return TxnIdBlockRequest.class.getSimpleName();
+    }
+
+    public static class Block {
+
+        private final long startingId;
+        private final int blockSize;
+
+        public Block(long startingId, int blockSize) {
+            this.startingId = startingId;
+            this.blockSize = blockSize;
+        }
+
+        public long getStartingId() {
+            return startingId;
+        }
+
+        public int getBlockSize() {
+            return blockSize;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java
new file mode 100644
index 0000000..46d742a
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TxnIdBlockResponse implements INcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+    private final long startingId;
+    private final int blockSize;
+
+    public TxnIdBlockResponse(long startingId, int blockSize) {
+        this.startingId = startingId;
+        this.blockSize = blockSize;
+    }
+
+    public long getStartingId() {
+        return startingId;
+    }
+
+    public int getBlockSize() {
+        return blockSize;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        TxnIdBlockRequest.addResponse(this);
+    }
+
+    @Override
+    public String toString() {
+        return TxnIdBlockResponse.class.getSimpleName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
new file mode 100644
index 0000000..542bc17
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.common.transactions.TxnId;
+
+class BulkTxnIdFactory implements ITxnIdFactory {
+
+    private final AtomicLong maxId = new AtomicLong();
+
+    @Override
+    public TxnId create() {
+        return new TxnId(maxId.incrementAndGet());
+    }
+
+    @Override
+    public long getIdBlock(int blockSize) {
+        if (blockSize < 1) {
+            throw new IllegalArgumentException("block size cannot be smaller 
than 1, but was " + blockSize);
+        }
+        return maxId.getAndAdd(blockSize) + 1;
+    }
+
+    @Override
+    public void ensureMinimumId(long id) {
+        this.maxId.getAndUpdate(next -> Math.max(next, id));
+    }
+
+    @Override
+    public long getMaxTxnId() {
+        return maxId.get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index b83df6c..4157e16 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,7 +24,6 @@ import java.util.function.Supplier;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
@@ -93,8 +92,7 @@ public class CcApplicationContext implements 
ICcApplicationContext {
             ILibraryManager libraryManager, Supplier<IMetadataBootstrap> 
metadataBootstrapSupplier,
             IGlobalRecoveryManager globalRecoveryManager, 
INcLifecycleCoordinator ftStrategy,
             IJobLifecycleListener activeLifeCycleListener, 
IStorageComponentProvider storageComponentProvider,
-            IMetadataLockManager mdLockManager, Supplier<ILongBlockFactory> 
txnIdBlockSupplier)
-            throws AlgebricksException, IOException {
+            IMetadataLockManager mdLockManager) throws AlgebricksException, 
IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.libraryManager = libraryManager;
@@ -122,7 +120,8 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         clusterStateManager.setCcAppCtx(this);
         this.resourceIdManager = new ResourceIdManager(clusterStateManager);
         nodeJobTracker = new NodeJobTracker();
-        txnIdFactory = new CcTxnIdFactory(txnIdBlockSupplier);
+        txnIdFactory = new BulkTxnIdFactory();
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
deleted file mode 100644
index 82bbe6b..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.utils;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-
-import org.apache.asterix.common.transactions.ILongBlockFactory;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
-import org.apache.asterix.common.transactions.TxnId;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * Represents a factory to generate unique transaction IDs.
- */
-class CcTxnIdFactory implements ITxnIdFactory {
-    private static final int TXN_BLOCK_SIZE = 1024;
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private final Supplier<ILongBlockFactory> blockFactorySupplier;
-    private volatile Block block = new Block(0, 0);
-
-    public CcTxnIdFactory(Supplier<ILongBlockFactory> blockFactorySupplier) {
-        this.blockFactorySupplier = blockFactorySupplier;
-    }
-
-    @Override
-    public TxnId create() throws AlgebricksException {
-        while (true) {
-            try {
-                return new TxnId(block.nextId());
-            } catch (BlockExhaustedException ex) {
-                // retry
-                LOGGER.info("block exhausted; obtaining new block from 
supplier");
-                block = new 
Block(blockFactorySupplier.get().getBlock(TXN_BLOCK_SIZE), TXN_BLOCK_SIZE);
-            }
-        }
-    }
-
-    @Override
-    public void ensureMinimumId(long id) throws AlgebricksException {
-        blockFactorySupplier.get().ensureMinimum(id);
-    }
-
-    static class Block {
-        private static final BlockExhaustedException BLOCK_EXHAUSTED_EXCEPTION 
= new BlockExhaustedException();
-        private final AtomicLong id;
-        private final long start;
-        private final long endExclusive;
-
-        private Block(long start, long blockSize) {
-            this.id = new AtomicLong(start);
-            this.start = start;
-            this.endExclusive = start + blockSize;
-        }
-
-        private long nextId() throws BlockExhaustedException {
-            long nextId = id.incrementAndGet();
-            if (nextId >= endExclusive && (endExclusive >= start || nextId < 
start)) {
-                throw BLOCK_EXHAUSTED_EXCEPTION;
-            }
-            return nextId;
-        }
-    }
-
-    private static class BlockExhaustedException extends Exception {
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 58b4b27..d0e4655 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -22,9 +22,6 @@ package org.apache.hyracks.api.exceptions;
 import java.io.Serializable;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 /**
  * The main execution time exception type for runtime errors in a hyracks 
environment
@@ -32,7 +29,6 @@ import org.apache.logging.log4j.Logger;
 public class HyracksDataException extends HyracksException {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LogManager.getLogger();
 
     public static HyracksDataException create(Throwable cause) {
         if (cause instanceof HyracksDataException || cause == null) {
@@ -40,11 +36,8 @@ public class HyracksDataException extends HyracksException {
         } else if (cause instanceof Error) {
             // don't wrap errors, allow them to propagate
             throw (Error) cause;
-        } else if (cause instanceof InterruptedException && 
!Thread.currentThread().isInterrupted()) {
-            // TODO(mblow): why not force interrupt on current thread?
-            LOGGER.log(Level.WARN,
-                    "Wrapping an InterruptedException in HyracksDataException 
and current thread is not interrupted",
-                    cause);
+        } else if (cause instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
         }
         return new HyracksDataException(cause);
     }
@@ -65,10 +58,8 @@ public class HyracksDataException extends HyracksException {
             // don't suppress errors into a HyracksDataException, allow them 
to propagate
             th.addSuppressed(root);
             throw (Error) th;
-        } else if (th instanceof InterruptedException && 
!Thread.currentThread().isInterrupted()) {
-            // TODO(mblow): why not force interrupt on current thread?
-            LOGGER.log(Level.WARN, "Suppressing an InterruptedException in a 
HyracksDataException and current "
-                    + "thread is not interrupted", th);
+        } else if (th instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
         }
         root.addSuppressed(th);
         return root;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a95ae3d..5e3c3d4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.control.common.base;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 5c6d078..3d505f3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1314,6 +1314,7 @@ public class CCNCFunctions {
     }
 
     public static class ShutdownResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
 
         private final String nodeId;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9c02f5e2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ae40ea3..027316e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.control.common.ipc;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;

Reply via email to