Repository: asterixdb
Updated Branches:
  refs/heads/master 65b807077 -> 1a3a82123


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
index 0fe55e6..d1385ec 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
@@ -22,6 +22,7 @@ package org.apache.hyracks.control.nc.work;
 import java.net.URL;
 import java.util.List;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -40,11 +41,13 @@ public class DeployBinaryWork extends AbstractWork {
     private DeploymentId deploymentId;
     private NodeControllerService ncs;
     private List<URL> binaryURLs;
+    private final CcId ccId;
 
-    public DeployBinaryWork(NodeControllerService ncs, DeploymentId 
deploymentId, List<URL> binaryURLs) {
+    public DeployBinaryWork(NodeControllerService ncs, DeploymentId 
deploymentId, List<URL> binaryURLs, CcId ccId) {
         this.deploymentId = deploymentId;
         this.ncs = ncs;
         this.binaryURLs = binaryURLs;
+        this.ccId = ccId;
     }
 
     @Override
@@ -59,7 +62,7 @@ public class DeployBinaryWork extends AbstractWork {
             e.printStackTrace();
         }
         try {
-            IClusterController ccs = ncs.getClusterController();
+            IClusterController ccs = ncs.getClusterController(ccId);
             ccs.notifyDeployBinary(deploymentId, ncs.getId(), status);
         } catch (Exception e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
index 4276b67..92612dd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
@@ -19,6 +19,7 @@
 
 package org.apache.hyracks.control.nc.work;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
@@ -34,12 +35,15 @@ public class DeployJobSpecWork extends AbstractWork {
 
     private final NodeControllerService ncs;
     private final byte[] acgBytes;
+    private final CcId ccId;
     private final DeployedJobSpecId deployedJobSpecId;
 
-    public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId 
deployedJobSpecId, byte[] acgBytes) {
+    public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId 
deployedJobSpecId, byte[] acgBytes,
+            CcId ccId) {
         this.ncs = ncs;
         this.deployedJobSpecId = deployedJobSpecId;
         this.acgBytes = acgBytes;
+        this.ccId = ccId;
     }
 
     @Override
@@ -51,7 +55,7 @@ public class DeployJobSpecWork extends AbstractWork {
             ncs.storeActivityClusterGraph(deployedJobSpecId, acg);
         } catch (HyracksException e) {
             try {
-                
ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, 
ncs.getId());
+                
ncs.getClusterController(ccId).notifyDeployedJobSpecFailure(deployedJobSpecId, 
ncs.getId());
             } catch (Exception e1) {
                 e1.printStackTrace();
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 449d9a3..614a9e0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -41,8 +41,8 @@ public class NotifyTaskCompleteWork extends AbstractWork {
         TaskProfile taskProfile =
                 new TaskProfile(task.getTaskAttemptId(), 
task.getPartitionSendProfile(), task.getStatsCollector());
         try {
-            
ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(),
 task.getTaskAttemptId(),
-                    ncs.getId(), taskProfile);
+            
ncs.getClusterController(task.getJobletContext().getJobId().getCcId()).notifyTaskComplete(
+                    task.getJobletContext().getJobId(), 
task.getTaskAttemptId(), ncs.getId(), taskProfile);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Failed notifying task complete for " + 
task.getTaskAttemptId(), e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 1d6ae1b..f0b68a0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -56,7 +56,7 @@ public class NotifyTaskFailureWork extends AbstractWork {
             if (dpm != null) {
                 dpm.abortReader(jobId);
             }
-            ncs.getClusterController().notifyTaskFailure(jobId, taskId, 
ncs.getId(), exceptions);
+            ncs.getClusterController(jobId.getCcId()).notifyTaskFailure(jobId, 
taskId, ncs.getId(), exceptions);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Failure reporting task failure to cluster 
controller", e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
index 9dbc901..0b3895f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.control.nc.work;
 
 import java.io.ByteArrayOutputStream;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -27,16 +28,19 @@ public class StateDumpWork extends SynchronizableWork {
     private final NodeControllerService ncs;
 
     private final String stateDumpId;
+    private final CcId ccId;
 
-    public StateDumpWork(NodeControllerService ncs, String stateDumpId) {
+    public StateDumpWork(NodeControllerService ncs, String stateDumpId, CcId 
ccId) {
         this.ncs = ncs;
         this.stateDumpId = stateDumpId;
+        this.ccId = ccId;
     }
 
     @Override
     protected void doRun() throws Exception {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ncs.getContext().getStateDumpHandler().dumpState(baos);
-        
ncs.getClusterController().notifyStateDump(ncs.getContext().getNodeId(), 
stateDumpId, baos.toString("UTF-8"));
+        
ncs.getClusterController(ccId).notifyStateDump(ncs.getContext().getNodeId(), 
stateDumpId,
+                baos.toString("UTF-8"));
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
index 1c10589..9b862b2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
@@ -19,6 +19,7 @@
 
 package org.apache.hyracks.control.nc.work;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -35,10 +36,12 @@ public class UnDeployBinaryWork extends AbstractWork {
 
     private DeploymentId deploymentId;
     private NodeControllerService ncs;
+    private final CcId ccId;
 
-    public UnDeployBinaryWork(NodeControllerService ncs, DeploymentId 
deploymentId) {
+    public UnDeployBinaryWork(NodeControllerService ncs, DeploymentId 
deploymentId, CcId ccId) {
         this.deploymentId = deploymentId;
         this.ncs = ncs;
+        this.ccId = ccId;
     }
 
     @Override
@@ -52,7 +55,7 @@ public class UnDeployBinaryWork extends AbstractWork {
             status = DeploymentStatus.FAIL;
         }
         try {
-            IClusterController ccs = ncs.getClusterController();
+            IClusterController ccs = ncs.getClusterController(ccId);
             ccs.notifyDeployBinary(deploymentId, ncs.getId(), status);
         } catch (Exception e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
index 4383ff6..afc4206 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
@@ -19,6 +19,7 @@
 
 package org.apache.hyracks.control.nc.work;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -32,10 +33,12 @@ public class UndeployJobSpecWork extends AbstractWork {
 
     private final NodeControllerService ncs;
     private final DeployedJobSpecId deployedJobSpecId;
+    private final CcId ccId;
 
-    public UndeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId 
deployedJobSpecId) {
+    public UndeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId 
deployedJobSpecId, CcId ccId) {
         this.ncs = ncs;
         this.deployedJobSpecId = deployedJobSpecId;
+        this.ccId = ccId;
     }
 
     @Override
@@ -44,7 +47,7 @@ public class UndeployJobSpecWork extends AbstractWork {
             ncs.removeActivityClusterGraph(deployedJobSpecId);
         } catch (HyracksException e) {
             try {
-                
ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, 
ncs.getId());
+                
ncs.getClusterController(ccId).notifyDeployedJobSpecFailure(deployedJobSpecId, 
ncs.getId());
             } catch (Exception e1) {
                 e1.printStackTrace();
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index 780a65c..15248e7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -22,6 +22,7 @@ import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 
@@ -45,7 +46,7 @@ public class TestNCApplication implements INCApplication {
     }
 
     @Override
-    public void onRegisterNode() throws Exception {
+    public void onRegisterNode(CcId ccs) throws Exception {
         // No-op
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 478138e..2aeab81 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -56,12 +56,6 @@ public class JobFailureTest extends 
AbstractMultiNCIntegrationTest {
     }
 
     @Test
-    public void waitForNonExistingJob() throws Exception {
-        JobId jobId = new JobId(Long.MAX_VALUE);
-        waitForCompletion(jobId, "has not been created yet");
-    }
-
-    @Test
     public void failureOnInit() throws Exception {
         JobSpecification spec = new JobSpecification();
         connectToSinkAndRun(spec, new 
FailOnInitializeDeInitializeOperatorDescriptor(spec, true, false),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index e310385..1d38d3d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -30,7 +30,6 @@ import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
-import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
 import org.apache.hyracks.control.nc.resources.memory.FrameManager;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java
new file mode 100644
index 0000000..b8d8ce4
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class InvokeUtil {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private InvokeUtil() {
+    }
+
+    /**
+     * Executes the passed interruptible, retrying if the operation is 
interrupted. Once the interruptible
+     * completes, the current thread will be re-interrupted, if the original 
operation was interrupted.
+     */
+    public static void doUninterruptibly(Interruptible interruptible) {
+        boolean interrupted = false;
+        try {
+            while (true) {
+                try {
+                    interruptible.run();
+                    break;
+                } catch (InterruptedException e) { // NOSONAR- we will 
re-interrupt the thread during unwind
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Executes the passed interruptible, retrying if the operation is 
interrupted. Once the interruptible
+     * completes, the current thread will be re-interrupted, if the original 
operation was interrupted.
+     */
+    public static void doExUninterruptibly(ThrowingInterruptible 
interruptible) throws Exception {
+        boolean interrupted = false;
+        try {
+            while (true) {
+                try {
+                    interruptible.run();
+                    break;
+                } catch (InterruptedException e) { // NOSONAR- we will 
re-interrupt the thread during unwind
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Executes the passed interruptible, retrying if the operation is 
interrupted.
+     *
+     * @return true if the original operation was interrupted, otherwise false
+     */
+    public static boolean doUninterruptiblyGet(Interruptible interruptible) {
+        boolean interrupted = false;
+        while (true) {
+            try {
+                interruptible.run();
+                break;
+            } catch (InterruptedException e) { // NOSONAR- contract states 
caller must handle
+                interrupted = true;
+            }
+        }
+        return interrupted;
+    }
+
+    /**
+     * Executes the passed interruptible, retrying if the operation is 
interrupted. If the operation throws an
+     * exception after being previously interrupted, the current thread will 
be re-interrupted.
+     *
+     * @return true if the original operation was interrupted, otherwise false
+     */
+    public static boolean doExUninterruptiblyGet(ThrowingInterruptible 
interruptible) throws Exception {
+        boolean interrupted = false;
+        boolean success = false;
+        while (true) {
+            try {
+                interruptible.run();
+                success = true;
+                break;
+            } catch (InterruptedException e) { // NOSONAR- contract states 
caller must handle
+                interrupted = true;
+            } finally {
+                if (!success && interrupted) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+        return interrupted;
+    }
+
+    public static boolean retryLoop(long duration, TimeUnit durationUnit, long 
delay, TimeUnit delayUnit,
+            Callable<Boolean> function) throws IOException {
+        long endTime = System.nanoTime() + durationUnit.toNanos(duration);
+        boolean first = true;
+        while (endTime - System.nanoTime() > 0) {
+            if (first) {
+                first = false;
+            } else {
+                try {
+                    delayUnit.sleep(delay);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return false;
+                }
+            }
+            try {
+                if (function.call()) {
+                    return true;
+                }
+            } catch (Exception e) {
+                // ignore, retry after delay
+                LOGGER.log(Level.DEBUG, "Ignoring exception on retryLoop 
attempt, will retry after delay", e);
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Executes the passed interruptible, retrying if the operation fails due 
to {@link ClosedByInterruptException} or
+     * {@link InterruptedException}. Once the interruptible completes, the 
current thread will be re-interrupted, if
+     * the original operation was interrupted.
+     */
+    public static void doIoUninterruptibly(ThrowingIOInterruptible 
interruptible) throws IOException {
+        boolean interrupted = false;
+        try {
+            while (true) {
+                try {
+                    interruptible.run();
+                    break;
+                } catch (ClosedByInterruptException | InterruptedException e) {
+                    LOGGER.error("IO operation Interrupted. Retrying..", e);
+                    interrupted = true;
+                    Thread.interrupted();
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @FunctionalInterface
+    public interface Interruptible {
+        void run() throws InterruptedException;
+    }
+
+    @FunctionalInterface
+    public interface ThrowingInterruptible {
+        void run() throws Exception; // NOSONAR
+    }
+
+    @FunctionalInterface
+    public interface ThrowingIOInterruptible {
+        void run() throws IOException, InterruptedException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java
index 79922b7..5313514 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java
@@ -38,7 +38,8 @@ public class Tracer implements ITracer {
 
     protected static final Level TRACE_LOG_LEVEL = Level.INFO;
     protected static final String CAT = "Tracer";
-    protected static final DateFormat DATE_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+    protected static final ThreadLocal<DateFormat> DATE_FORMAT = ThreadLocal
+            .withInitial(() -> new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
 
     protected final Logger traceLog;
     protected long categories;
@@ -67,9 +68,7 @@ public class Tracer implements ITracer {
     }
 
     public static String dateTimeStamp() {
-        synchronized (DATE_FORMAT) {
-            return "{\"datetime\":\"" + DATE_FORMAT.format(new Date()) + "\"}";
-        }
+        return "{\"datetime\":\"" + DATE_FORMAT.get().format(new Date()) + 
"\"}";
     }
 
     @Override

Reply via email to