Repository: asterixdb Updated Branches: refs/heads/master ad758c525 -> d753479fe
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 0e74a4c..b1909dd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -20,6 +20,7 @@ package org.apache.hyracks.control.nc; import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; @@ -84,6 +85,7 @@ import org.apache.hyracks.control.nc.net.MessagingNetworkManager; import org.apache.hyracks.control.nc.net.NetworkManager; import org.apache.hyracks.control.nc.partitions.PartitionManager; import org.apache.hyracks.control.nc.resources.memory.MemoryManager; +import org.apache.hyracks.control.nc.work.AbortAllJobsWork; import org.apache.hyracks.control.nc.work.BuildJobProfilesWork; import org.apache.hyracks.ipc.api.IIPCEventListener; import org.apache.hyracks.ipc.api.IIPCHandle; @@ -108,7 +110,7 @@ public class NodeControllerService implements IControllerService { private static final double MEMORY_FUDGE_FACTOR = 0.8; private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1); - private NCConfig ncConfig; + private final NCConfig ncConfig; private final String id; @@ -128,13 +130,15 @@ public class NodeControllerService implements IControllerService { private final Timer timer; - private boolean registrationPending; + private CcId primaryCcId; - private Exception registrationException; + private final Object ccLock = new Object(); - private IClusterController primaryCcs; + private final Map<CcId, CcConnection> ccMap = Collections.synchronizedMap(new HashMap<>()); - private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new HashMap<>()); + private final Map<InetSocketAddress, CcId> ccAddressMap = Collections.synchronizedMap(new HashMap<>()); + + private final Map<Integer, CcConnection> pendingRegistrations = Collections.synchronizedMap(new HashMap<>()); private final Map<JobId, Joblet> jobletMap; @@ -144,11 +148,9 @@ public class NodeControllerService implements IControllerService { private ExecutorService executor; - private NodeParameters nodeParameters; - - private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>(); + private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>(); - private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>(); + private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>(); private final ServerContext serverCtx; @@ -180,9 +182,7 @@ public class NodeControllerService implements IControllerService { private final ConfigManager configManager; - private NodeRegistration nodeRegistration; - - private final AtomicLong maxJobId = new AtomicLong(-1); + private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>(); static { ExitUtil.init(); @@ -254,7 +254,7 @@ public class NodeControllerService implements IControllerService { } getNodeControllerInfosAcceptor.setValue(fv); } - primaryCcs.getNodeControllerInfos(); + getPrimaryClusterController().getNodeControllerInfos(); return fv.get(); } @@ -302,9 +302,7 @@ public class NodeControllerService implements IControllerService { messagingNetManager.start(); } - final InetSocketAddress ccAddress = - new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()); - this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress); + this.primaryCcId = addCc(new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort())); workQueue.start(); @@ -315,64 +313,81 @@ public class NodeControllerService implements IControllerService { application.startupCompleted(); } - public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception { - ClusterControllerRemoteProxy ccProxy; - synchronized (ccsMap) { - if (ccsMap.containsKey(ccId)) { - throw new IllegalStateException("cc already registered: " + ccId); + public CcId addCc(InetSocketAddress ccAddress) throws Exception { + synchronized (ccLock) { + LOGGER.info("addCc: {}", ccAddress); + if (ccAddress.isUnresolved()) { + throw new IllegalArgumentException("must use resolved InetSocketAddress"); + } + if (ccAddressMap.containsKey(ccAddress)) { + throw new IllegalStateException("cc already registered: " + ccAddress); } final IIPCEventListener ipcEventListener = new IIPCEventListener() { @Override public void ipcHandleRestored(IIPCHandle handle) throws IPCException { // we need to re-register in case of NC -> CC connection reset try { - registerNode(ccsMap.get(ccId)); + registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress); } catch (Exception e) { LOGGER.log(Level.WARN, "Failed Registering with cc", e); throw new IPCException(e); } } }; - ccProxy = new ClusterControllerRemoteProxy(ccId, + ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy( ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener)); - registerNode(ccProxy); - ccsMap.put(ccId, ccProxy); + CcConnection ccc = new CcConnection(ccProxy); + return registerNode(ccc, ccAddress); } - return ccProxy; } - public void makePrimaryCc(CcId ccId) throws Exception { - synchronized (ccsMap) { - if (!ccsMap.containsKey(ccId)) { - throw new IllegalArgumentException("unknown cc: " + ccId); - } - primaryCcs = ccsMap.get(ccId); + public void makePrimaryCc(InetSocketAddress ccAddress) throws Exception { + LOGGER.info("makePrimaryCc: {}", ccAddress); + if (ccAddress.isUnresolved()) { + throw new IllegalArgumentException("must use resolved InetSocketAddress"); + } + CcId newPrimaryCc = ccAddressMap.get(ccAddress); + if (newPrimaryCc == null) { + throw new IllegalArgumentException("unknown cc: " + ccAddress); } + this.primaryCcId = newPrimaryCc; } - public void removeCc(CcId ccId) throws Exception { - synchronized (ccsMap) { - final IClusterController ccs = ccsMap.get(ccId); - if (ccs == null) { - throw new IllegalArgumentException("unknown cc: " + ccId); + public void removeCc(InetSocketAddress ccAddress) throws Exception { + synchronized (ccLock) { + LOGGER.info("removeCc: {}", ccAddress); + if (ccAddress.isUnresolved()) { + throw new IllegalArgumentException("must use resolved InetSocketAddress"); } - if (primaryCcs.equals(ccs)) { - throw new IllegalStateException("cannot remove primary cc: " + ccId); + CcId ccId = ccAddressMap.get(ccAddress); + if (ccId == null) { + LOGGER.warn("ignoring request to remove unknown cc: {}", ccAddress); + return; } - // TODO(mblow): consider how to handle running jobs - ccs.unregisterNode(id); - Thread hbThread = heartbeatThreads.remove(ccs); + if (primaryCcId.equals(ccId)) { + throw new IllegalStateException("cannot remove primary cc: " + ccAddress); + } + try { + final CcConnection ccc = getCcConnection(ccId); + ccc.getClusterControllerService().unregisterNode(id); + } catch (Exception e) { + LOGGER.warn("ignoring exception trying to gracefully unregister cc {}: ", () -> ccId, + () -> String.valueOf(e)); + } + getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId)); + Thread hbThread = heartbeatThreads.remove(ccId); hbThread.interrupt(); - Timer ccTimer = ccTimers.remove(ccs); + Timer ccTimer = ccTimers.remove(ccId); if (ccTimer != null) { ccTimer.cancel(); } + ccMap.remove(ccId); + ccAddressMap.remove(ccAddress); } } - protected void registerNode(IClusterController ccs) throws Exception { - LOGGER.info("Registering with Cluster Controller {}", ccs); - registrationPending = true; + protected CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception { + LOGGER.info("Registering with Cluster Controller {}", ccc); HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()]; for (int i = 0; i < gcInfos.length; ++i) { gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName()); @@ -389,54 +404,70 @@ public class NodeControllerService implements IControllerService { NetworkAddress netAddress = netManager.getPublicNetworkAddress(); NetworkAddress messagingAddress = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null; - int allCores = osMXBean.getAvailableProcessors(); - nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, osMXBean.getName(), - osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(), - runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), - runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(), - runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(), - PidHelper.getPid(), maxJobId.get()); - - ccs.registerNode(nodeRegistration); - - completeNodeRegistration(ccs); + NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, + osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(), + runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), + runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), + runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, + application.getCapacity(), PidHelper.getPid()); + + pendingRegistrations.put(nodeRegistration.getRegistrationId(), ccc); + CcId ccId = ccc.registerNode(nodeRegistration); + ccMap.put(ccId, ccc); + ccAddressMap.put(ccAddress, ccId); + Serializable distributedState = ccc.getNodeParameters().getDistributedState(); + if (distributedState != null) { + getDistributedState().put(ccId, distributedState); + } + application.onRegisterNode(ccId); + IClusterController ccs = ccc.getClusterControllerService(); + NodeParameters nodeParameters = ccc.getNodeParameters(); // Start heartbeat generator. - if (!heartbeatThreads.containsKey(ccs)) { + if (!heartbeatThreads.containsKey(ccId)) { Thread heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat"); heartbeatThread.setPriority(Thread.MAX_PRIORITY); heartbeatThread.setDaemon(true); heartbeatThread.start(); - heartbeatThreads.put(ccs, heartbeatThread); + heartbeatThreads.put(ccId, heartbeatThread); } - if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() > 0) { - Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true); + if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) { + Timer ccTimer = new Timer("Timer-" + ccId, true); // Schedule profile dump generator. - ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod()); - ccTimers.put(ccs, ccTimer); + ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod()); + ccTimers.put(ccId, ccTimer); } - LOGGER.info("Registering with Cluster Controller {} complete", ccs); + LOGGER.info("Registering with Cluster Controller {} complete", ccc); + return ccId; } - synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { - this.nodeParameters = parameters; - this.registrationException = exception; - this.registrationPending = false; - notifyAll(); + void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { + CcConnection ccc = getPendingNodeRegistration(parameters); + ccc.setNodeRegistrationResult(parameters, exception); } - private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception { - while (registrationPending) { - wait(); + private CcConnection getCcConnection(CcId ccId) { + CcConnection ccConnection = ccMap.get(ccId); + if (ccConnection == null) { + throw new IllegalArgumentException("unknown ccId: " + ccId); } - if (registrationException != null) { - LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", registrationException); - throw registrationException; + return ccConnection; + } + + private CcConnection getPendingNodeRegistration(NodeParameters nodeParameters) { + CcConnection ccConnection = pendingRegistrations.remove(nodeParameters.getRegistrationId()); + if (ccConnection == null) { + throw new IllegalStateException("Unknown pending node registration " + nodeParameters.getRegistrationId() + + " for " + nodeParameters.getClusterControllerInfo().getCcId()); } - serviceCtx.setDistributedState(nodeParameters.getDistributedState()); - application.onRegisterNode(ccs.getCcId()); + return ccConnection; + } + + private ConcurrentHashMap<CcId, Serializable> getDistributedState() { + //noinspection unchecked + return (ConcurrentHashMap<CcId, Serializable>) serviceCtx.getDistributedState(); } private void startApplication() throws Exception { @@ -448,7 +479,12 @@ public class NodeControllerService implements IControllerService { } public void updateMaxJobId(JobId jobId) { - maxJobId.getAndUpdate(currentMaxId -> Math.max(currentMaxId, jobId.getId())); + maxJobIds.computeIfAbsent(jobId.getCcId(), key -> new AtomicLong()) + .getAndUpdate(currentMaxId -> Math.max(currentMaxId, jobId.getId())); + } + + public long getMaxJobId(CcId ccId) { + return maxJobIds.computeIfAbsent(ccId, key -> new AtomicLong(ccId.toLongMask())).get(); } @Override @@ -478,10 +514,10 @@ public class NodeControllerService implements IControllerService { t.interrupt(); InvokeUtil.doUninterruptibly(() -> t.join(1000)); }); - synchronized (ccsMap) { - ccsMap.values().parallelStream().forEach(ccs -> { + synchronized (ccLock) { + ccMap.values().parallelStream().forEach(cc -> { try { - ccs.notifyShutdown(id); + cc.getClusterControllerService().notifyShutdown(id); } catch (Exception e) { LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e); } @@ -520,13 +556,8 @@ public class NodeControllerService implements IControllerService { jobParameterByteStoreMap.remove(jobId); } - public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException { - JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId); - if (jpbs == null) { - jpbs = new JobParameterByteStore(); - jobParameterByteStoreMap.put(jobId, jpbs); - } - return jpbs; + public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) { + return jobParameterByteStoreMap.computeIfAbsent(jobId, jid -> new JobParameterByteStore()); } public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg) @@ -550,7 +581,7 @@ public class NodeControllerService implements IControllerService { } } - public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) throws HyracksException { + public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) { return deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()); } @@ -566,16 +597,21 @@ public class NodeControllerService implements IControllerService { return partitionManager; } + public CcId getPrimaryCcId() { + // TODO(mblow): this can change at any time, need notification framework + return primaryCcId; + } + public IClusterController getPrimaryClusterController() { - return primaryCcs; + return getClusterController(primaryCcId); } public IClusterController getClusterController(CcId ccId) { - return ccsMap.get(ccId); + return getCcConnection(ccId).getClusterControllerService(); } - public NodeParameters getNodeParameters() { - return nodeParameters; + public NodeParameters getNodeParameters(CcId ccId) { + return getCcConnection(ccId).getNodeParameters(); } @Override @@ -691,17 +727,19 @@ public class NodeControllerService implements IControllerService { } private class ProfileDumpTask extends TimerTask { - private IClusterController cc; + private final IClusterController cc; + private final CcId ccId; - public ProfileDumpTask(IClusterController cc) { + public ProfileDumpTask(IClusterController cc, CcId ccId) { this.cc = cc; + this.ccId = ccId; } @Override public void run() { try { FutureValue<List<JobProfile>> fv = new FutureValue<>(); - BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv); + BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, ccId, fv); workQueue.scheduleAndSync(bjpw); List<JobProfile> profiles = fv.get(); if (!profiles.isEmpty()) { @@ -734,7 +772,7 @@ public class NodeControllerService implements IControllerService { } public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception { - ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id); + getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id); } public IDatasetPartitionManager getDatasetPartitionManager() { @@ -759,4 +797,5 @@ public class NodeControllerService implements IControllerService { public Object getApplicationContext() { return application.getApplicationContext(); } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java index 6a75471..8733022 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java @@ -20,6 +20,7 @@ package org.apache.hyracks.control.nc.application; import java.io.IOException; import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IStateDumpHandler; @@ -50,7 +51,7 @@ public class NCServiceContext extends ServiceContext implements INCServiceContex public NCServiceContext(NodeControllerService ncs, ServerContext serverCtx, IOManager ioManager, String nodeId, MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager, - IApplicationConfig appConfig) throws IOException { + IApplicationConfig appConfig) { super(serverCtx, appConfig, new HyracksThreadFactory(nodeId)); this.lccm = lifeCyclecomponentManager; this.nodeId = nodeId; @@ -59,6 +60,7 @@ public class NCServiceContext extends ServiceContext implements INCServiceContex this.ncs = ncs; this.sdh = lccm::dumpState; this.tracer = new Tracer(nodeId, ncs.getConfiguration().getTraceCategories(), new TraceCategoryRegistry()); + this.distributedState = new ConcurrentHashMap<>(); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index 54a171d..2742aaa 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -41,6 +41,7 @@ import org.apache.hyracks.api.io.IIOFuture; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.util.file.FileUtil; public class IOManager implements IIOManager { /* @@ -72,7 +73,11 @@ public class IOManager implements IIOManager { workspaces = new ArrayList<>(); for (IODeviceHandle d : ioDevices) { if (d.getWorkspace() != null) { - new File(d.getMount(), d.getWorkspace()).mkdirs(); + try { + FileUtil.forceMkdirs(new File(d.getMount(), d.getWorkspace())); + } catch (IOException e) { + throw HyracksDataException.create(e); + } workspaces.add(d); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-util/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml b/hyracks-fullstack/hyracks/hyracks-util/pom.xml index c521f08..8de30ae 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml @@ -79,6 +79,10 @@ <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java index d6e175e..1b0093d 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java @@ -19,10 +19,18 @@ package org.apache.hyracks.util.file; import java.io.File; +import java.io.IOException; import java.util.regex.Pattern; +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + public class FileUtil { + private static final Logger LOGGER = LogManager.getLogger(); + private static final Object LOCK = new Object(); + private FileUtil() { } @@ -30,6 +38,19 @@ public class FileUtil { return joinPath(File.separatorChar, elements); } + public static void forceMkdirs(File dir) throws IOException { + File canonicalDir = dir.getCanonicalFile(); + try { + FileUtils.forceMkdir(canonicalDir); + } catch (IOException e) { + LOGGER.warn("failure to create directory {}, retrying", dir, e); + synchronized (LOCK) { + FileUtils.forceMkdir(canonicalDir); + } + } + + } + static String joinPath(char separatorChar, String... elements) { final String separator = String.valueOf(separatorChar); final String escapedSeparator = Pattern.quote(separator); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/src/test/resources/log4j2-test.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/src/test/resources/log4j2-test.xml b/hyracks-fullstack/src/test/resources/log4j2-test.xml index d56f215..a8141ee 100644 --- a/hyracks-fullstack/src/test/resources/log4j2-test.xml +++ b/hyracks-fullstack/src/test/resources/log4j2-test.xml @@ -32,12 +32,10 @@ <Root level="WARN"> <AppenderRef ref="InfoLog"/> </Root> - <Logger name="org.apache.hyracks" level="INFO" additivity="false"> - <AppenderRef ref="InfoLog"/> - </Logger> - <Logger name="org.apache.hyracks.test" level="INFO" additivity="false"> + <Logger name="org.apache.hyracks.control.nc.service" level="INFO"/> + <Logger name="org.apache.hyracks" level="INFO"/> + <Logger name="org.apache.hyracks.test" level="INFO"> <AppenderRef ref="ConsoleTest"/> - <AppenderRef ref="InfoLog"/> </Logger> </Loggers> </Configuration>
