Repository: asterixdb
Updated Branches:
  refs/heads/master ad758c525 -> d753479fe


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0e74a4c..b1909dd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.control.nc;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
@@ -84,6 +85,7 @@ import 
org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
+import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
 import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
@@ -108,7 +110,7 @@ public class NodeControllerService implements 
IControllerService {
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
     private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
 
-    private NCConfig ncConfig;
+    private final NCConfig ncConfig;
 
     private final String id;
 
@@ -128,13 +130,15 @@ public class NodeControllerService implements 
IControllerService {
 
     private final Timer timer;
 
-    private boolean registrationPending;
+    private CcId primaryCcId;
 
-    private Exception registrationException;
+    private final Object ccLock = new Object();
 
-    private IClusterController primaryCcs;
+    private final Map<CcId, CcConnection> ccMap = 
Collections.synchronizedMap(new HashMap<>());
 
-    private final Map<CcId, IClusterController> ccsMap = 
Collections.synchronizedMap(new HashMap<>());
+    private final Map<InetSocketAddress, CcId> ccAddressMap = 
Collections.synchronizedMap(new HashMap<>());
+
+    private final Map<Integer, CcConnection> pendingRegistrations = 
Collections.synchronizedMap(new HashMap<>());
 
     private final Map<JobId, Joblet> jobletMap;
 
@@ -144,11 +148,9 @@ public class NodeControllerService implements 
IControllerService {
 
     private ExecutorService executor;
 
-    private NodeParameters nodeParameters;
-
-    private Map<IClusterController, Thread> heartbeatThreads = new 
ConcurrentHashMap<>();
+    private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>();
 
-    private Map<IClusterController, Timer> ccTimers = new 
ConcurrentHashMap<>();
+    private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>();
 
     private final ServerContext serverCtx;
 
@@ -180,9 +182,7 @@ public class NodeControllerService implements 
IControllerService {
 
     private final ConfigManager configManager;
 
-    private NodeRegistration nodeRegistration;
-
-    private final AtomicLong maxJobId = new AtomicLong(-1);
+    private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>();
 
     static {
         ExitUtil.init();
@@ -254,7 +254,7 @@ public class NodeControllerService implements 
IControllerService {
             }
             getNodeControllerInfosAcceptor.setValue(fv);
         }
-        primaryCcs.getNodeControllerInfos();
+        getPrimaryClusterController().getNodeControllerInfos();
         return fv.get();
     }
 
@@ -302,9 +302,7 @@ public class NodeControllerService implements 
IControllerService {
             messagingNetManager.start();
         }
 
-        final InetSocketAddress ccAddress =
-                new InetSocketAddress(ncConfig.getClusterAddress(), 
ncConfig.getClusterPort());
-        this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress);
+        this.primaryCcId = addCc(new 
InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
 
         workQueue.start();
 
@@ -315,64 +313,81 @@ public class NodeControllerService implements 
IControllerService {
         application.startupCompleted();
     }
 
-    public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress 
ccAddress) throws Exception {
-        ClusterControllerRemoteProxy ccProxy;
-        synchronized (ccsMap) {
-            if (ccsMap.containsKey(ccId)) {
-                throw new IllegalStateException("cc already registered: " + 
ccId);
+    public CcId addCc(InetSocketAddress ccAddress) throws Exception {
+        synchronized (ccLock) {
+            LOGGER.info("addCc: {}", ccAddress);
+            if (ccAddress.isUnresolved()) {
+                throw new IllegalArgumentException("must use resolved 
InetSocketAddress");
+            }
+            if (ccAddressMap.containsKey(ccAddress)) {
+                throw new IllegalStateException("cc already registered: " + 
ccAddress);
             }
             final IIPCEventListener ipcEventListener = new IIPCEventListener() 
{
                 @Override
                 public void ipcHandleRestored(IIPCHandle handle) throws 
IPCException {
                     // we need to re-register in case of NC -> CC connection 
reset
                     try {
-                        registerNode(ccsMap.get(ccId));
+                        
registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress);
                     } catch (Exception e) {
                         LOGGER.log(Level.WARN, "Failed Registering with cc", 
e);
                         throw new IPCException(e);
                     }
                 }
             };
-            ccProxy = new ClusterControllerRemoteProxy(ccId,
+            ClusterControllerRemoteProxy ccProxy = new 
ClusterControllerRemoteProxy(
                     ipc.getHandle(ccAddress, 
ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            registerNode(ccProxy);
-            ccsMap.put(ccId, ccProxy);
+            CcConnection ccc = new CcConnection(ccProxy);
+            return registerNode(ccc, ccAddress);
         }
-        return ccProxy;
     }
 
-    public void makePrimaryCc(CcId ccId) throws Exception {
-        synchronized (ccsMap) {
-            if (!ccsMap.containsKey(ccId)) {
-                throw new IllegalArgumentException("unknown cc: " + ccId);
-            }
-            primaryCcs = ccsMap.get(ccId);
+    public void makePrimaryCc(InetSocketAddress ccAddress) throws Exception {
+        LOGGER.info("makePrimaryCc: {}", ccAddress);
+        if (ccAddress.isUnresolved()) {
+            throw new IllegalArgumentException("must use resolved 
InetSocketAddress");
+        }
+        CcId newPrimaryCc = ccAddressMap.get(ccAddress);
+        if (newPrimaryCc == null) {
+            throw new IllegalArgumentException("unknown cc: " + ccAddress);
         }
+        this.primaryCcId = newPrimaryCc;
     }
 
-    public void removeCc(CcId ccId) throws Exception {
-        synchronized (ccsMap) {
-            final IClusterController ccs = ccsMap.get(ccId);
-            if (ccs == null) {
-                throw new IllegalArgumentException("unknown cc: " + ccId);
+    public void removeCc(InetSocketAddress ccAddress) throws Exception {
+        synchronized (ccLock) {
+            LOGGER.info("removeCc: {}", ccAddress);
+            if (ccAddress.isUnresolved()) {
+                throw new IllegalArgumentException("must use resolved 
InetSocketAddress");
             }
-            if (primaryCcs.equals(ccs)) {
-                throw new IllegalStateException("cannot remove primary cc: " + 
ccId);
+            CcId ccId = ccAddressMap.get(ccAddress);
+            if (ccId == null) {
+                LOGGER.warn("ignoring request to remove unknown cc: {}", 
ccAddress);
+                return;
             }
-            // TODO(mblow): consider how to handle running jobs
-            ccs.unregisterNode(id);
-            Thread hbThread = heartbeatThreads.remove(ccs);
+            if (primaryCcId.equals(ccId)) {
+                throw new IllegalStateException("cannot remove primary cc: " + 
ccAddress);
+            }
+            try {
+                final CcConnection ccc = getCcConnection(ccId);
+                ccc.getClusterControllerService().unregisterNode(id);
+            } catch (Exception e) {
+                LOGGER.warn("ignoring exception trying to gracefully 
unregister cc {}: ", () -> ccId,
+                        () -> String.valueOf(e));
+            }
+            getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId));
+            Thread hbThread = heartbeatThreads.remove(ccId);
             hbThread.interrupt();
-            Timer ccTimer = ccTimers.remove(ccs);
+            Timer ccTimer = ccTimers.remove(ccId);
             if (ccTimer != null) {
                 ccTimer.cancel();
             }
+            ccMap.remove(ccId);
+            ccAddressMap.remove(ccAddress);
         }
     }
 
-    protected void registerNode(IClusterController ccs) throws Exception {
-        LOGGER.info("Registering with Cluster Controller {}", ccs);
-        registrationPending = true;
+    protected CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) 
throws Exception {
+        LOGGER.info("Registering with Cluster Controller {}", ccc);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new 
HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
@@ -389,54 +404,70 @@ public class NodeControllerService implements 
IControllerService {
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
         NetworkAddress messagingAddress =
                 messagingNetManager != null ? 
messagingNetManager.getPublicNetworkAddress() : null;
-        int allCores = osMXBean.getAvailableProcessors();
-        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, 
netAddress, datasetAddress, osMXBean.getName(),
-                osMXBean.getArch(), osMXBean.getVersion(), allCores, 
runtimeMXBean.getVmName(),
-                runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), 
runtimeMXBean.getClassPath(),
-                runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema, 
messagingAddress, application.getCapacity(),
-                PidHelper.getPid(), maxJobId.get());
-
-        ccs.registerNode(nodeRegistration);
-
-        completeNodeRegistration(ccs);
+        NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, 
id, ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), 
runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress,
+                application.getCapacity(), PidHelper.getPid());
+
+        pendingRegistrations.put(nodeRegistration.getRegistrationId(), ccc);
+        CcId ccId = ccc.registerNode(nodeRegistration);
+        ccMap.put(ccId, ccc);
+        ccAddressMap.put(ccAddress, ccId);
+        Serializable distributedState = 
ccc.getNodeParameters().getDistributedState();
+        if (distributedState != null) {
+            getDistributedState().put(ccId, distributedState);
+        }
+        application.onRegisterNode(ccId);
+        IClusterController ccs = ccc.getClusterControllerService();
+        NodeParameters nodeParameters = ccc.getNodeParameters();
 
         // Start heartbeat generator.
-        if (!heartbeatThreads.containsKey(ccs)) {
+        if (!heartbeatThreads.containsKey(ccId)) {
             Thread heartbeatThread =
                     new Thread(new HeartbeatTask(ccs, 
nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
             heartbeatThread.setPriority(Thread.MAX_PRIORITY);
             heartbeatThread.setDaemon(true);
             heartbeatThread.start();
-            heartbeatThreads.put(ccs, heartbeatThread);
+            heartbeatThreads.put(ccId, heartbeatThread);
         }
-        if (!ccTimers.containsKey(ccs) && 
nodeParameters.getProfileDumpPeriod() > 0) {
-            Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true);
+        if (!ccTimers.containsKey(ccId) && 
nodeParameters.getProfileDumpPeriod() > 0) {
+            Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
-            ccTimer.schedule(new ProfileDumpTask(ccs), 0, 
nodeParameters.getProfileDumpPeriod());
-            ccTimers.put(ccs, ccTimer);
+            ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, 
nodeParameters.getProfileDumpPeriod());
+            ccTimers.put(ccId, ccTimer);
         }
 
-        LOGGER.info("Registering with Cluster Controller {} complete", ccs);
+        LOGGER.info("Registering with Cluster Controller {} complete", ccc);
+        return ccId;
     }
 
-    synchronized void setNodeRegistrationResult(NodeParameters parameters, 
Exception exception) {
-        this.nodeParameters = parameters;
-        this.registrationException = exception;
-        this.registrationPending = false;
-        notifyAll();
+    void setNodeRegistrationResult(NodeParameters parameters, Exception 
exception) {
+        CcConnection ccc = getPendingNodeRegistration(parameters);
+        ccc.setNodeRegistrationResult(parameters, exception);
     }
 
-    private synchronized void completeNodeRegistration(IClusterController ccs) 
throws Exception {
-        while (registrationPending) {
-            wait();
+    private CcConnection getCcConnection(CcId ccId) {
+        CcConnection ccConnection = ccMap.get(ccId);
+        if (ccConnection == null) {
+            throw new IllegalArgumentException("unknown ccId: " + ccId);
         }
-        if (registrationException != null) {
-            LOGGER.log(Level.WARN, "Registering with Cluster Controller failed 
with exception", registrationException);
-            throw registrationException;
+        return ccConnection;
+    }
+
+    private CcConnection getPendingNodeRegistration(NodeParameters 
nodeParameters) {
+        CcConnection ccConnection = 
pendingRegistrations.remove(nodeParameters.getRegistrationId());
+        if (ccConnection == null) {
+            throw new IllegalStateException("Unknown pending node registration 
" + nodeParameters.getRegistrationId()
+                    + " for " + 
nodeParameters.getClusterControllerInfo().getCcId());
         }
-        serviceCtx.setDistributedState(nodeParameters.getDistributedState());
-        application.onRegisterNode(ccs.getCcId());
+        return ccConnection;
+    }
+
+    private ConcurrentHashMap<CcId, Serializable> getDistributedState() {
+        //noinspection unchecked
+        return (ConcurrentHashMap<CcId, Serializable>) 
serviceCtx.getDistributedState();
     }
 
     private void startApplication() throws Exception {
@@ -448,7 +479,12 @@ public class NodeControllerService implements 
IControllerService {
     }
 
     public void updateMaxJobId(JobId jobId) {
-        maxJobId.getAndUpdate(currentMaxId -> Math.max(currentMaxId, 
jobId.getId()));
+        maxJobIds.computeIfAbsent(jobId.getCcId(), key -> new AtomicLong())
+                .getAndUpdate(currentMaxId -> Math.max(currentMaxId, 
jobId.getId()));
+    }
+
+    public long getMaxJobId(CcId ccId) {
+        return maxJobIds.computeIfAbsent(ccId, key -> new 
AtomicLong(ccId.toLongMask())).get();
     }
 
     @Override
@@ -478,10 +514,10 @@ public class NodeControllerService implements 
IControllerService {
                 t.interrupt();
                 InvokeUtil.doUninterruptibly(() -> t.join(1000));
             });
-            synchronized (ccsMap) {
-                ccsMap.values().parallelStream().forEach(ccs -> {
+            synchronized (ccLock) {
+                ccMap.values().parallelStream().forEach(cc -> {
                     try {
-                        ccs.notifyShutdown(id);
+                        cc.getClusterControllerService().notifyShutdown(id);
                     } catch (Exception e) {
                         LOGGER.log(Level.WARN, "Exception notifying CC of 
shutdown", e);
                     }
@@ -520,13 +556,8 @@ public class NodeControllerService implements 
IControllerService {
         jobParameterByteStoreMap.remove(jobId);
     }
 
-    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) 
throws HyracksException {
-        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
-        if (jpbs == null) {
-            jpbs = new JobParameterByteStore();
-            jobParameterByteStoreMap.put(jobId, jpbs);
-        }
-        return jpbs;
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) 
{
+        return jobParameterByteStoreMap.computeIfAbsent(jobId, jid -> new 
JobParameterByteStore());
     }
 
     public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, 
ActivityClusterGraph acg)
@@ -550,7 +581,7 @@ public class NodeControllerService implements 
IControllerService {
         }
     }
 
-    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId 
deployedJobSpecId) throws HyracksException {
+    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId 
deployedJobSpecId) {
         return 
deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId());
     }
 
@@ -566,16 +597,21 @@ public class NodeControllerService implements 
IControllerService {
         return partitionManager;
     }
 
+    public CcId getPrimaryCcId() {
+        // TODO(mblow): this can change at any time, need notification 
framework
+        return primaryCcId;
+    }
+
     public IClusterController getPrimaryClusterController() {
-        return primaryCcs;
+        return getClusterController(primaryCcId);
     }
 
     public IClusterController getClusterController(CcId ccId) {
-        return ccsMap.get(ccId);
+        return getCcConnection(ccId).getClusterControllerService();
     }
 
-    public NodeParameters getNodeParameters() {
-        return nodeParameters;
+    public NodeParameters getNodeParameters(CcId ccId) {
+        return getCcConnection(ccId).getNodeParameters();
     }
 
     @Override
@@ -691,17 +727,19 @@ public class NodeControllerService implements 
IControllerService {
     }
 
     private class ProfileDumpTask extends TimerTask {
-        private IClusterController cc;
+        private final IClusterController cc;
+        private final CcId ccId;
 
-        public ProfileDumpTask(IClusterController cc) {
+        public ProfileDumpTask(IClusterController cc, CcId ccId) {
             this.cc = cc;
+            this.ccId = ccId;
         }
 
         @Override
         public void run() {
             try {
                 FutureValue<List<JobProfile>> fv = new FutureValue<>();
-                BuildJobProfilesWork bjpw = new 
BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv);
+                BuildJobProfilesWork bjpw = new 
BuildJobProfilesWork(NodeControllerService.this, ccId, fv);
                 workQueue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
                 if (!profiles.isEmpty()) {
@@ -734,7 +772,7 @@ public class NodeControllerService implements 
IControllerService {
     }
 
     public void sendApplicationMessageToCC(CcId ccId, byte[] data, 
DeploymentId deploymentId) throws Exception {
-        ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id);
+        getClusterController(ccId).sendApplicationMessageToCC(data, 
deploymentId, id);
     }
 
     public IDatasetPartitionManager getDatasetPartitionManager() {
@@ -759,4 +797,5 @@ public class NodeControllerService implements 
IControllerService {
     public Object getApplicationContext() {
         return application.getApplicationContext();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index 6a75471..8733022 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.control.nc.application;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
@@ -50,7 +51,7 @@ public class NCServiceContext extends ServiceContext 
implements INCServiceContex
 
     public NCServiceContext(NodeControllerService ncs, ServerContext 
serverCtx, IOManager ioManager, String nodeId,
             MemoryManager memoryManager, ILifeCycleComponentManager 
lifeCyclecomponentManager,
-            IApplicationConfig appConfig) throws IOException {
+            IApplicationConfig appConfig) {
         super(serverCtx, appConfig, new HyracksThreadFactory(nodeId));
         this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
@@ -59,6 +60,7 @@ public class NCServiceContext extends ServiceContext 
implements INCServiceContex
         this.ncs = ncs;
         this.sdh = lccm::dumpState;
         this.tracer = new Tracer(nodeId, 
ncs.getConfiguration().getTraceCategories(), new TraceCategoryRegistry());
+        this.distributedState = new ConcurrentHashMap<>();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 54a171d..2742aaa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.api.io.IIOFuture;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.util.file.FileUtil;
 
 public class IOManager implements IIOManager {
     /*
@@ -72,7 +73,11 @@ public class IOManager implements IIOManager {
         workspaces = new ArrayList<>();
         for (IODeviceHandle d : ioDevices) {
             if (d.getWorkspace() != null) {
-                new File(d.getMount(), d.getWorkspace()).mkdirs();
+                try {
+                    FileUtil.forceMkdirs(new File(d.getMount(), 
d.getWorkspace()));
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
+                }
                 workspaces.add(d);
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-util/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
index c521f08..8de30ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
@@ -79,6 +79,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-core</artifactId>
     </dependency>
+      <dependency>
+          <groupId>commons-io</groupId>
+          <artifactId>commons-io</artifactId>
+      </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
index d6e175e..1b0093d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
@@ -19,10 +19,18 @@
 package org.apache.hyracks.util.file;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.regex.Pattern;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 public class FileUtil {
 
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final Object LOCK = new Object();
+
     private FileUtil() {
     }
 
@@ -30,6 +38,19 @@ public class FileUtil {
         return joinPath(File.separatorChar, elements);
     }
 
+    public static void forceMkdirs(File dir) throws IOException {
+        File canonicalDir = dir.getCanonicalFile();
+        try {
+            FileUtils.forceMkdir(canonicalDir);
+        } catch (IOException e) {
+            LOGGER.warn("failure to create directory {}, retrying", dir, e);
+            synchronized (LOCK) {
+                FileUtils.forceMkdir(canonicalDir);
+            }
+        }
+
+    }
+
     static String joinPath(char separatorChar, String... elements) {
         final String separator = String.valueOf(separatorChar);
         final String escapedSeparator = Pattern.quote(separator);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/src/test/resources/log4j2-test.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/src/test/resources/log4j2-test.xml 
b/hyracks-fullstack/src/test/resources/log4j2-test.xml
index d56f215..a8141ee 100644
--- a/hyracks-fullstack/src/test/resources/log4j2-test.xml
+++ b/hyracks-fullstack/src/test/resources/log4j2-test.xml
@@ -32,12 +32,10 @@
     <Root level="WARN">
       <AppenderRef ref="InfoLog"/>
     </Root>
-    <Logger name="org.apache.hyracks" level="INFO" additivity="false">
-      <AppenderRef ref="InfoLog"/>
-    </Logger>
-    <Logger name="org.apache.hyracks.test" level="INFO" additivity="false">
+    <Logger name="org.apache.hyracks.control.nc.service" level="INFO"/>
+    <Logger name="org.apache.hyracks" level="INFO"/>
+    <Logger name="org.apache.hyracks.test" level="INFO">
       <AppenderRef ref="ConsoleTest"/>
-      <AppenderRef ref="InfoLog"/>
     </Logger>
   </Loggers>
 </Configuration>

Reply via email to