http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index 2ebfe78..eb14bae 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -35,7 +35,6 @@ import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.dataflow.value.ITypeTraits; @@ -161,8 +160,9 @@ public class IndexUtil { * the metadata provider. * @return the AsterixDB job id for transaction management. */ - public static void bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) { - TxnId txnId = TxnIdFactory.create(); + public static void bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) + throws AlgebricksException { + TxnId txnId = metadataProvider.getTxnIdFactory().create(); metadataProvider.setTxnId(txnId); boolean isWriteTransaction = metadataProvider.isWriteTransaction(); IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, isWriteTransaction);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java index 4b95253..db2a044 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java @@ -24,9 +24,10 @@ import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.transactions.IResourceIdManager; -import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; @@ -37,19 +38,27 @@ public class ReportLocalCountersMessage implements ICcAddressedMessage { private static final Logger LOGGER = LogManager.getLogger(); private final long maxResourceId; private final long maxTxnId; + private final long maxJobId; private final String src; - public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId) { + public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId, long maxJobId) { this.src = src; this.maxResourceId = maxResourceId; this.maxTxnId = maxTxnId; + this.maxJobId = maxJobId; } @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); - TxnIdFactory.ensureMinimumId(maxTxnId); + try { + appCtx.getTxnIdFactory().ensureMinimumId(maxTxnId); + } catch (AlgebricksException e) { + throw HyracksDataException.create(e); + } resourceIdManager.report(src, maxResourceId); + ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobIdFactory() + .setMaxJobId(maxJobId); } public static void send(CcId ccId, NodeControllerService ncs) throws HyracksDataException { @@ -57,8 +66,9 @@ public class ReportLocalCountersMessage implements ICcAddressedMessage { long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId(); + long maxJobId = ncs.getMaxJobId(ccId); ReportLocalCountersMessage countersMessage = - new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId); + new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId, maxJobId); try { ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(ccId, countersMessage); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index d8f14a2..b83df6c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -24,6 +24,8 @@ import java.util.function.Supplier; import org.apache.asterix.common.api.ICoordinationService; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.api.INodeJobTracker; +import org.apache.asterix.common.transactions.ILongBlockFactory; +import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ActiveProperties; @@ -85,12 +87,14 @@ public class CcApplicationContext implements ICcApplicationContext { private IMetadataLockManager mdLockManager; private IClusterStateManager clusterStateManager; private final INodeJobTracker nodeJobTracker; + private final ITxnIdFactory txnIdFactory; public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider, - IMetadataLockManager mdLockManager) throws AlgebricksException, IOException { + IMetadataLockManager mdLockManager, Supplier<ILongBlockFactory> txnIdBlockSupplier) + throws AlgebricksException, IOException { this.ccServiceCtx = ccServiceCtx; this.hcc = hcc; this.libraryManager = libraryManager; @@ -118,6 +122,7 @@ public class CcApplicationContext implements ICcApplicationContext { clusterStateManager.setCcAppCtx(this); this.resourceIdManager = new ResourceIdManager(clusterStateManager); nodeJobTracker = new NodeJobTracker(); + txnIdFactory = new CcTxnIdFactory(txnIdBlockSupplier); } @Override @@ -265,4 +270,8 @@ public class CcApplicationContext implements ICcApplicationContext { public ICoordinationService getCoordinationService() { return NoOpCoordinationService.INSTANCE; } + + public ITxnIdFactory getTxnIdFactory() { + return txnIdFactory; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java new file mode 100644 index 0000000..82bbe6b --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.utils; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import org.apache.asterix.common.transactions.ILongBlockFactory; +import org.apache.asterix.common.transactions.ITxnIdFactory; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Represents a factory to generate unique transaction IDs. + */ +class CcTxnIdFactory implements ITxnIdFactory { + private static final int TXN_BLOCK_SIZE = 1024; + private static final Logger LOGGER = LogManager.getLogger(); + + private final Supplier<ILongBlockFactory> blockFactorySupplier; + private volatile Block block = new Block(0, 0); + + public CcTxnIdFactory(Supplier<ILongBlockFactory> blockFactorySupplier) { + this.blockFactorySupplier = blockFactorySupplier; + } + + @Override + public TxnId create() throws AlgebricksException { + while (true) { + try { + return new TxnId(block.nextId()); + } catch (BlockExhaustedException ex) { + // retry + LOGGER.info("block exhausted; obtaining new block from supplier"); + block = new Block(blockFactorySupplier.get().getBlock(TXN_BLOCK_SIZE), TXN_BLOCK_SIZE); + } + } + } + + @Override + public void ensureMinimumId(long id) throws AlgebricksException { + blockFactorySupplier.get().ensureMinimum(id); + } + + static class Block { + private static final BlockExhaustedException BLOCK_EXHAUSTED_EXCEPTION = new BlockExhaustedException(); + private final AtomicLong id; + private final long start; + private final long endExclusive; + + private Block(long start, long blockSize) { + this.id = new AtomicLong(start); + this.start = start; + this.endExclusive = start + blockSize; + } + + private long nextId() throws BlockExhaustedException { + long nextId = id.incrementAndGet(); + if (nextId >= endExclusive && (endExclusive >= start || nextId < start)) { + throw BLOCK_EXHAUSTED_EXCEPTION; + } + return nextId; + } + } + + private static class BlockExhaustedException extends Exception { + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java index 406a762..201945c 100644 --- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java +++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java @@ -22,27 +22,28 @@ import java.io.File; import javax.xml.XMLConstants; import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; +import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; import javax.xml.transform.sax.SAXSource; import org.xml.sax.InputSource; +import org.xml.sax.SAXException; public class TestSuiteParser { - public TestSuiteParser() { - } - public org.apache.asterix.testframework.xml.TestSuite parse(File testSuiteCatalog) throws Exception { + public TestSuite parse(File testSuiteCatalog) throws SAXException, JAXBException, ParserConfigurationException { SAXParserFactory saxParserFactory = SAXParserFactory.newInstance(); saxParserFactory.setNamespaceAware(true); saxParserFactory.setXIncludeAware(true); SAXParser saxParser = saxParserFactory.newSAXParser(); saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file"); - JAXBContext ctx = JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class); + JAXBContext ctx = JAXBContext.newInstance(TestSuite.class); Unmarshaller um = ctx.createUnmarshaller(); - return (org.apache.asterix.testframework.xml.TestSuite) um.unmarshal( + return (TestSuite) um.unmarshal( new SAXSource(saxParser.getXMLReader(), new InputSource(testSuiteCatalog.toURI().toString()))); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java deleted file mode 100644 index eb59e74..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.transaction; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.asterix.common.transactions.TxnId; - -/** - * Represents a factory to generate unique transaction IDs. - */ -public class TxnIdFactory { - - private static final AtomicLong id = new AtomicLong(); - - private TxnIdFactory() { - } - - public static TxnId create() { - return new TxnId(id.incrementAndGet()); - } - - public static void ensureMinimumId(long id) { - TxnIdFactory.id.updateAndGet(current -> Math.max(current, id)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java index 3d69ddb..0e04dca 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java @@ -18,23 +18,32 @@ */ package org.apache.hyracks.api.client; +import org.apache.hyracks.api.control.CcId; + import java.io.Serializable; public class ClusterControllerInfo implements Serializable { private static final long serialVersionUID = 1L; + private final CcId ccId; + private final String clientNetAddress; private final int clientNetPort; private final int webPort; - public ClusterControllerInfo(String clientNetAddress, int clientNetPort, int webPort) { + public ClusterControllerInfo(CcId ccId, String clientNetAddress, int clientNetPort, int webPort) { + this.ccId = ccId; this.clientNetAddress = clientNetAddress; this.clientNetPort = clientNetPort; this.webPort = webPort; } + public CcId getCcId() { + return ccId; + } + public int getWebPort() { return webPort; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java index d2a254f..aee22c9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.api.config; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; public interface IOptionType<T> { @@ -26,6 +27,11 @@ public interface IOptionType<T> { */ T parse(String s); + /** + * @throws IllegalArgumentException when the supplied JSON node cannot be interpreted + */ + T parse(JsonNode node); + Class<T> targetType(); /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java index 32782fd..2a7be9d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java @@ -45,6 +45,10 @@ public class CcId implements Serializable { return id; } + public long toLongMask() { + return (long) id << CcIdPartitionedLongFactory.ID_BITS; + } + @Override public int hashCode() { return id; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java new file mode 100644 index 0000000..0a26494 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.control; + +import java.util.concurrent.atomic.AtomicLong; + +public class CcIdPartitionedLongFactory { + private static final int CC_BITS = Short.SIZE; + public static final int ID_BITS = Long.SIZE - CC_BITS; + public static final long MAX_ID = (1L << ID_BITS) - 1; + private final CcId ccId; + private final AtomicLong id; + + public CcIdPartitionedLongFactory(CcId ccId) { + this.ccId = ccId; + id = new AtomicLong(ccId.toLongMask()); + } + + protected long nextId() { + return id.getAndUpdate(prev -> { + if ((prev & MAX_ID) == MAX_ID) { + return prev ^ MAX_ID; + } else { + return prev + 1; + } + }); + } + + protected long maxId() { + long next = id.get(); + if ((next & MAX_ID) == 0) { + return next | MAX_ID; + } else { + return next - 1; + } + } + + protected void ensureMinimumId(long id) { + if ((id & ~MAX_ID) != ccId.toLongMask()) { + throw new IllegalArgumentException("cannot change ccId as part of ensureMinimumId() (was: " + + Long.toHexString(this.id.get()) + ", given: " + Long.toHexString(id)); + } + this.id.updateAndGet(current -> Math.max(current, id)); + } + + public CcId getCcId() { + return ccId; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java index c83366f..de6b5ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java @@ -22,23 +22,24 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.control.CcIdPartitionedLongFactory; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IWritable; public final class JobId implements IWritable, Serializable, Comparable { - private static final int CC_BITS = Short.SIZE; - static final int ID_BITS = Long.SIZE - CC_BITS; - static final long MAX_ID = (1L << ID_BITS) - 1; + private static final Pattern jobIdPattern = Pattern.compile("^JID:(\\d+)\\.(\\d+)$"); public static final JobId INVALID = null; private static final long serialVersionUID = 1L; private long id; - private transient CcId ccId; + private transient volatile CcId ccId; public static JobId create(DataInput dis) throws IOException { JobId jobId = new JobId(); @@ -59,13 +60,13 @@ public final class JobId implements IWritable, Serializable, Comparable { public CcId getCcId() { if (ccId == null) { - ccId = CcId.valueOf((int) (id >>> ID_BITS)); + ccId = CcId.valueOf((int) (id >>> CcIdPartitionedLongFactory.ID_BITS)); } return ccId; } public long getIdOnly() { - return id & MAX_ID; + return id & CcIdPartitionedLongFactory.MAX_ID; } @Override @@ -80,13 +81,17 @@ public final class JobId implements IWritable, Serializable, Comparable { @Override public String toString() { - return "JID:" + id; + return "JID:" + (id >>> CcIdPartitionedLongFactory.ID_BITS) + "." + getIdOnly(); } public static JobId parse(String str) throws HyracksDataException { - if (str.startsWith("JID:")) { - str = str.substring(4); - return new JobId(Long.parseLong(str)); + Matcher m = jobIdPattern.matcher(str); + if (m.matches()) { + int ccId = Integer.parseInt(m.group(1)); + if (ccId <= 0xffff && ccId >= 0) { + long jobId = Long.parseLong(m.group(2)) | (long) ccId << CcIdPartitionedLongFactory.ID_BITS; + return new JobId(jobId); + } } throw HyracksDataException.create(ErrorCode.NOT_A_JOBID, str); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java index 1bb5749..528d35b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java @@ -18,36 +18,23 @@ */ package org.apache.hyracks.api.job; -import static org.apache.hyracks.api.job.JobId.ID_BITS; -import static org.apache.hyracks.api.job.JobId.MAX_ID; - -import java.util.concurrent.atomic.AtomicLong; - import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.control.CcIdPartitionedLongFactory; -public class JobIdFactory { - private final AtomicLong id; - +public class JobIdFactory extends CcIdPartitionedLongFactory { public JobIdFactory(CcId ccId) { - id = new AtomicLong((long) ccId.shortValue() << ID_BITS); + super(ccId); } public JobId create() { - return new JobId(id.getAndUpdate(prev -> { - if ((prev & MAX_ID) == MAX_ID) { - return prev ^ MAX_ID; - } else { - return prev + 1; - } - })); + return new JobId(nextId()); } public JobId maxJobId() { - long next = id.get(); - if ((next & MAX_ID) == 0) { - return new JobId(next | MAX_ID); - } else { - return new JobId(next - 1); - } + return new JobId(maxId()); + } + + public void setMaxJobId(long maxJobId) { + ensureMinimumId(maxJobId + 1); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java deleted file mode 100644 index 709f098..0000000 --- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.api.job; - -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hyracks.api.control.CcId; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -public class JobIdFactoryTest { - - private static Field idField; - - @BeforeClass - public static void setup() throws NoSuchFieldException { - idField = JobIdFactory.class.getDeclaredField("id"); - idField.setAccessible(true); - } - - @Test - public void testCcIds() { - JobIdFactory factory = new JobIdFactory(CcId.valueOf(0)); - for (int i = 0; i < 1000; i++) { - final JobId jobId = factory.create(); - Assert.assertEquals(0, jobId.getCcId().shortValue()); - Assert.assertEquals(i, jobId.getIdOnly()); - } - } - - @Test - public void testNegativeCcId() { - JobIdFactory factory = new JobIdFactory(CcId.valueOf(0xFFFF)); - for (int i = 0; i < 1000; i++) { - final JobId jobId = factory.create(); - Assert.assertEquals((short) 0xFFFF, jobId.getCcId().shortValue()); - Assert.assertEquals(i, jobId.getIdOnly()); - Assert.assertTrue("JID not negative", jobId.getId() < 0); - Assert.assertEquals(0xFFFF000000000000L + i, jobId.getId()); - } - } - - @Test - public void testOverflow() throws IllegalAccessException { - testOverflow(0); - testOverflow(0xFFFF); - testOverflow(Short.MAX_VALUE); - } - - private void testOverflow(int id) throws IllegalAccessException { - CcId ccId = CcId.valueOf(id); - long expected = (long) id << 48; - JobIdFactory factory = new JobIdFactory(ccId); - AtomicLong theId = (AtomicLong) idField.get(factory); - Assert.assertEquals(expected, theId.get()); - theId.set((((long) 1 << 48) - 1) | expected); - JobId jobId = factory.create(); - Assert.assertEquals(ccId, jobId.getCcId()); - Assert.assertEquals(JobId.MAX_ID, jobId.getIdOnly()); - jobId = factory.create(); - Assert.assertEquals(ccId, jobId.getCcId()); - Assert.assertEquals(0, jobId.getIdOnly()); - } - - @Test - public void testComparability() throws IllegalAccessException { - JobIdFactory factory = new JobIdFactory(CcId.valueOf(0)); - compareLoop(factory, false); - factory = new JobIdFactory(CcId.valueOf(0xFFFF)); - compareLoop(factory, false); - AtomicLong theId = (AtomicLong) idField.get(factory); - theId.set(0xFFFFFFFFFFFFFFF0L); - compareLoop(factory, true); - } - - private void compareLoop(JobIdFactory factory, boolean overflow) { - Set<Boolean> overflowed = new HashSet<>(Collections.singleton(false)); - JobId prevMax = null; - for (int i = 0; i < 1000; i++) { - final JobId jobId = factory.create(); - Assert.assertTrue("max == last", factory.maxJobId().compareTo(jobId) == 0); - if (i > 0) { - Assert.assertTrue("last > previous max", prevMax.compareTo(jobId) < 0 || overflowed.add(overflow)); - } - prevMax = factory.maxJobId(); - } - } - - @Test - public void testTooLarge() { - try { - CcId.valueOf(0x10000); - Assert.assertTrue("expected exception", false); - } catch (IllegalArgumentException e) { - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java new file mode 100644 index 0000000..d2c1d09 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.control.CcIdPartitionedLongFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class JobIdTest { + + private static Field idField; + + @BeforeClass + public static void setup() throws NoSuchFieldException { + idField = CcIdPartitionedLongFactory.class.getDeclaredField("id"); + idField.setAccessible(true); + } + + @Test + public void testCcIds() { + JobIdFactory factory = new JobIdFactory(CcId.valueOf(0)); + for (int i = 0; i < 1000; i++) { + final JobId jobId = factory.create(); + Assert.assertEquals(0, jobId.getCcId().shortValue()); + Assert.assertEquals(i, jobId.getIdOnly()); + } + } + + @Test + public void testNegativeCcId() { + JobIdFactory factory = new JobIdFactory(CcId.valueOf(0xFFFF)); + for (int i = 0; i < 1000; i++) { + final JobId jobId = factory.create(); + Assert.assertEquals((short) 0xFFFF, jobId.getCcId().shortValue()); + Assert.assertEquals(i, jobId.getIdOnly()); + Assert.assertTrue("JID not negative", jobId.getId() < 0); + Assert.assertEquals(0xFFFF000000000000L + i, jobId.getId()); + } + } + + @Test + public void testOverflow() throws IllegalAccessException { + testOverflow(0); + testOverflow(0xFFFF); + testOverflow(Short.MAX_VALUE); + } + + private void testOverflow(int id) throws IllegalAccessException { + CcId ccId = CcId.valueOf(id); + long expected = (long) id << 48; + JobIdFactory factory = new JobIdFactory(ccId); + AtomicLong theId = (AtomicLong) idField.get(factory); + Assert.assertEquals(expected, theId.get()); + theId.set((((long) 1 << 48) - 1) | expected); + JobId jobId = factory.create(); + Assert.assertEquals(ccId, jobId.getCcId()); + Assert.assertEquals(CcIdPartitionedLongFactory.MAX_ID, jobId.getIdOnly()); + jobId = factory.create(); + Assert.assertEquals(ccId, jobId.getCcId()); + Assert.assertEquals(0, jobId.getIdOnly()); + } + + @Test + public void testComparability() throws IllegalAccessException { + JobIdFactory factory = new JobIdFactory(CcId.valueOf(0)); + compareLoop(factory, false); + factory = new JobIdFactory(CcId.valueOf(0xFFFF)); + compareLoop(factory, false); + AtomicLong theId = (AtomicLong) idField.get(factory); + theId.set(0xFFFFFFFFFFFFFFF0L); + compareLoop(factory, true); + } + + private void compareLoop(JobIdFactory factory, boolean overflow) { + Set<Boolean> overflowed = new HashSet<>(Collections.singleton(false)); + JobId prevMax = null; + for (int i = 0; i < 1000; i++) { + final JobId jobId = factory.create(); + Assert.assertTrue("max == last", factory.maxJobId().compareTo(jobId) == 0); + if (i > 0) { + Assert.assertTrue("last > previous max", prevMax.compareTo(jobId) < 0 || overflowed.add(overflow)); + } + prevMax = factory.maxJobId(); + } + } + + @Test + public void testTooLarge() { + try { + CcId.valueOf(0x10000); + Assert.assertTrue("expected exception", false); + } catch (IllegalArgumentException e) { + } + } + + @Test + public void testParse() throws HyracksDataException { + for (int ccId : Arrays.asList(0xFFFF, 0, (int) Short.MAX_VALUE)) { + JobIdFactory factory = new JobIdFactory(CcId.valueOf(ccId)); + for (int i = 0; i < 1000; i++) { + final JobId jobId = factory.create(); + Assert.assertEquals(jobId.getId(), JobId.parse(jobId.toString()).getId()); + Assert.assertEquals(jobId, JobId.parse(jobId.toString())); + Assert.assertFalse(jobId.toString(), jobId.toString().contains("-")); + System.err.println(jobId.toString()); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index a6edd70..f8fe77f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -215,7 +215,7 @@ public class ClusterControllerService implements IControllerService { clusterIPC.start(); clientIPC.start(); webServer.start(); - info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(), + info = new ClusterControllerInfo(ccId, ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(), webServer.getListeningPort()); timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis()); jobLog.open(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java index 26245e1..de166dd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java @@ -42,7 +42,6 @@ import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.application.ServiceContext; import org.apache.hyracks.control.common.context.ServerContext; import org.apache.hyracks.control.common.utils.HyracksThreadFactory; -import org.apache.hyracks.control.common.work.IResultCallback; public class CCServiceContext extends ServiceContext implements ICCServiceContext { private final ICCContext ccContext; @@ -50,9 +49,6 @@ public class CCServiceContext extends ServiceContext implements ICCServiceContex protected final Set<String> initPendingNodeIds; protected final Set<String> deinitPendingNodeIds; - protected IResultCallback<Object> initializationCallback; - protected IResultCallback<Object> deinitializationCallback; - private List<IJobLifecycleListener> jobLifecycleListeners; private List<IClusterLifecycleListener> clusterLifecycleListeners; private final ClusterControllerService ccs; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index 77ecbee..96f5f1b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -71,6 +71,7 @@ public class RegisterNodeWork extends SynchronizableWork { params.setDistributedState(ccs.getContext().getDistributedState()); params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis()); params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod()); + params.setRegistrationId(reg.getRegistrationId()); result = new CCNCFunctions.NodeRegistrationResult(params, null); } catch (Exception e) { LOGGER.log(Level.WARN, "Node registration failed", e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index 9cf84dd..a95ae3d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -73,6 +73,4 @@ public interface IClusterController { void getNodeControllerInfos() throws Exception; void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception; - - CcId getCcId(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java index 986ca96..fd8c116 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java @@ -73,7 +73,9 @@ public class ConfigManager implements IConfigManager, Serializable { new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator()); private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class); @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map - private Map<String, Map<IOption, Object>> nodeSpecificMap = Collections.synchronizedMap(new TreeMap<>()); + private Map<String, Map<IOption, Object>> nodeSpecificDefinedMap = Collections.synchronizedMap(new TreeMap<>()); + @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map + private Map<String, Map<IOption, Object>> nodeSpecificDefaultMap = Collections.synchronizedMap(new TreeMap<>()); private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>(); private final String[] args; private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this); @@ -154,26 +156,28 @@ public class ConfigManager implements IConfigManager, Serializable { } } else { registeredOptions.add(option); - optionSetters.put(option, - (node, value, - isDefault) -> correctedMap(option.section() == Section.NC ? node : null, isDefault) - .put(option, value)); + optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value)); if (LOGGER.isDebugEnabled()) { - optionSetters.put(option, (node, value, isDefault) -> LOGGER - .debug((isDefault ? "defaulting" : "setting ") + option.toIniString() + " to " + value)); + optionSetters.put(option, (node, value, isDefault) -> LOGGER.debug("{} {} to {} for node {}", + isDefault ? "defaulting" : "setting", option.toIniString(), value, node)); } } } } private Map<IOption, Object> correctedMap(String node, boolean isDefault) { - return node == null ? (isDefault ? defaultMap : definedMap) - : nodeSpecificMap.computeIfAbsent(node, this::createNodeSpecificMap); + if (node == null) { + return isDefault ? defaultMap : definedMap; + } else { + ensureNode(node); + return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node); + } } public void ensureNode(String nodeId) { LOGGER.debug("ensureNode: " + nodeId); - nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); + nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); + nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); } private Map<IOption, Object> createNodeSpecificMap(String nodeId) { @@ -352,17 +356,13 @@ public class ConfigManager implements IConfigManager, Serializable { private void applyDefaults() { LOGGER.debug("applying defaults"); sectionMap.forEach((key, value) -> { - if (key == Section.NC) { - value.values().forEach(option -> getNodeNames() - .forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node))); - for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificMap.entrySet()) { - value.values() - .forEach(option -> getOrDefault( - new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option, - nodeMap.getKey())); - } - } else { - value.values().forEach(option -> getOrDefault(configurationMap, option, null)); + value.values().forEach( + option -> getNodeNames().forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node))); + for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificDefinedMap.entrySet()) { + value.values() + .forEach(option -> getOrDefault( + new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option, + nodeMap.getKey())); } }); } @@ -433,17 +433,18 @@ public class ConfigManager implements IConfigManager, Serializable { } public List<String> getNodeNames() { - return Collections.unmodifiableList(new ArrayList<>(nodeSpecificMap.keySet())); + return Collections.unmodifiableList(new ArrayList<>(nodeSpecificDefinedMap.keySet())); } public IApplicationConfig getNodeEffectiveConfig(String nodeId) { - final Map<IOption, Object> nodeMap = nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); + ensureNode(nodeId); + final Map<IOption, Object> nodeMap = nodeSpecificDefaultMap.get(nodeId); Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId); return new ConfigManagerApplicationConfig(this) { @Override public Object getStatic(IOption option) { if (!nodeEffectiveMap.containsKey(option)) { - // we need to calculate the default the the context of the node specific map... + // we need to calculate the default within the context of the node specific map... nodeMap.put(option, getOrDefault(nodeEffectiveMap, option, nodeId)); } return nodeEffectiveMap.get(option); @@ -451,8 +452,14 @@ public class ConfigManager implements IConfigManager, Serializable { }; } - private CompositeMap<IOption, Object> getNodeEffectiveMap(String nodeId) { - return new CompositeMap<>(nodeSpecificMap.get(nodeId), definedMap, new NoOpMapMutator()); + private Map<IOption, Object> getNodeEffectiveMap(String nodeId) { + ensureNode(nodeId); + CompositeMap<IOption, Object> nodeEffectiveMap = new CompositeMap<>(); + nodeEffectiveMap.setMutator(new NoOpMapMutator()); + nodeEffectiveMap.addComposited(nodeSpecificDefinedMap.get(nodeId)); + nodeEffectiveMap.addComposited(nodeSpecificDefaultMap.get(nodeId)); + nodeEffectiveMap.addComposited(definedMap); + return nodeEffectiveMap; } public Ini toIni(boolean includeDefaults) { @@ -462,8 +469,11 @@ public class ConfigManager implements IConfigManager, Serializable { ini.add(option.section().sectionName(), option.ini(), option.type().serializeToIni(value)); } }); - nodeSpecificMap.forEach((key, nodeValueMap) -> { + for (String key : getNodeNames()) { String section = Section.NC.sectionName() + "/" + key; + ensureNode(key); + Map<IOption, Object> nodeValueMap = + includeDefaults ? getNodeEffectiveMap(key) : nodeSpecificDefinedMap.get(key); synchronized (nodeValueMap) { for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) { if (entry.getValue() != null) { @@ -472,10 +482,9 @@ public class ConfigManager implements IConfigManager, Serializable { } } } - }); - extensionOptions.forEach((extension, options) -> { - options.forEach(option -> ini.add(extension, option.getKey(), option.getValue())); - }); + } + extensionOptions.forEach((extension, options) -> options + .forEach(option -> ini.add(extension, option.getKey(), option.getValue()))); return ini; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java index 3807a00..b188548 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java @@ -19,12 +19,16 @@ package org.apache.hyracks.control.common.config; import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.util.StorageUtil; import org.apache.logging.log4j.Level; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; public class OptionTypes { @@ -43,6 +47,11 @@ public class OptionTypes { } @Override + public Integer parse(JsonNode node) { + return node.isNull() ? null : parse(node.asText()); + } + + @Override public Class<Integer> targetType() { return Integer.class; } @@ -65,6 +74,11 @@ public class OptionTypes { } @Override + public Long parse(JsonNode node) { + return node.isNull() ? null : parse(node.asText()); + } + + @Override public Class<Long> targetType() { return Long.class; } @@ -84,13 +98,22 @@ public class OptionTypes { @Override public Short parse(String s) { int value = Integer.decode(s); - if (Integer.highestOneBit(value) > 16) { - throw new IllegalArgumentException("The given value " + s + " is too big for a short"); + return validateShort(value); + } + + private Short validateShort(int value) { + if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) { + throw new IllegalArgumentException("The given value " + value + " does not fit in a short"); } return (short) value; } @Override + public Short parse(JsonNode node) { + return node.isNull() ? null : validateShort(node.asInt()); + } + + @Override public Class<Short> targetType() { return Short.class; } @@ -108,6 +131,11 @@ public class OptionTypes { } @Override + public Integer parse(JsonNode node) { + return node.isNull() ? null : node.asInt(); + } + + @Override public Class<Integer> targetType() { return Integer.class; } @@ -125,6 +153,11 @@ public class OptionTypes { } @Override + public Double parse(JsonNode node) { + return node.isNull() ? null : node.asDouble(); + } + + @Override public Class<Double> targetType() { return Double.class; } @@ -142,6 +175,11 @@ public class OptionTypes { } @Override + public String parse(JsonNode node) { + return node.isNull() ? null : node.asText(); + } + + @Override public Class<String> targetType() { return String.class; } @@ -159,6 +197,11 @@ public class OptionTypes { } @Override + public Long parse(JsonNode node) { + return node.isNull() ? null : node.asLong(); + } + + @Override public Class<Long> targetType() { return Long.class; } @@ -176,6 +219,11 @@ public class OptionTypes { } @Override + public Boolean parse(JsonNode node) { + return node.isNull() ? null : node.asBoolean(); + } + + @Override public Class<Boolean> targetType() { return Boolean.class; } @@ -200,6 +248,11 @@ public class OptionTypes { } @Override + public Level parse(JsonNode node) { + return node.isNull() ? null : parse(node.asText()); + } + + @Override public Class<Level> targetType() { return Level.class; } @@ -227,6 +280,20 @@ public class OptionTypes { } @Override + public String[] parse(JsonNode node) { + if (node.isNull()) { + return null; + } + List<String> strings = new ArrayList<>(); + if (node instanceof ArrayNode) { + node.elements().forEachRemaining(n -> strings.add(n.asText())); + return strings.toArray(new String[strings.size()]); + } else { + return parse(node.asText()); + } + } + + @Override public Class<String[]> targetType() { return String[].class; } @@ -253,6 +320,11 @@ public class OptionTypes { } @Override + public java.net.URL parse(JsonNode node) { + return node.isNull() ? null : parse(node.asText()); + } + + @Override public Class<java.net.URL> targetType() { return java.net.URL.class; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 519bafc..75c0827 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -50,7 +50,6 @@ public class NCConfig extends ControllerConfig { NCSERVICE_PORT(INTEGER, 9090), CLUSTER_ADDRESS(STRING, (String) null), CLUSTER_PORT(INTEGER, 1099), - CLUSTER_CONTROLLER_ID(SHORT, (short) 0x0000), CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT), NODE_ID(STRING, (String) null), @@ -144,8 +143,6 @@ public class NCConfig extends ControllerConfig { return "Cluster Controller port"; case CLUSTER_LISTEN_PORT: return "IP port to bind cluster listener"; - case CLUSTER_CONTROLLER_ID: - return "16-bit (0-65535) id of the Cluster Controller"; case CLUSTER_PUBLIC_ADDRESS: return "Public IP Address to announce cluster listener"; case CLUSTER_PUBLIC_PORT: @@ -313,10 +310,6 @@ public class NCConfig extends ControllerConfig { configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort); } - public CcId getClusterControllerId() { - return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID)); - } - public String getClusterListenAddress() { return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java index bf233a8..e78a423 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java @@ -33,6 +33,8 @@ public class NodeParameters implements Serializable { private int profileDumpPeriod; + private int registrationId; + public ClusterControllerInfo getClusterControllerInfo() { return ccInfo; } @@ -64,4 +66,12 @@ public class NodeParameters implements Serializable { public void setProfileDumpPeriod(int profileDumpPeriod) { this.profileDumpPeriod = profileDumpPeriod; } + + public int getRegistrationId() { + return registrationId; + } + + public void setRegistrationId(int registrationId) { + this.registrationId = registrationId; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java index 75ef0b7..a87c30a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.job.resource.NodeCapacity; @@ -72,13 +73,15 @@ public final class NodeRegistration implements Serializable { private final NodeCapacity capacity; - private final long maxJobId; + private final int registrationId; + + private static final AtomicInteger nextRegistrationId = new AtomicInteger(); public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort, NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema, - NetworkAddress messagingPort, NodeCapacity capacity, int pid, long maxJobId) { + NetworkAddress messagingPort, NodeCapacity capacity, int pid) { this.ncAddress = ncAddress; this.nodeId = nodeId; this.ncConfig = ncConfig; @@ -100,7 +103,7 @@ public final class NodeRegistration implements Serializable { this.messagingPort = messagingPort; this.capacity = capacity; this.pid = pid; - this.maxJobId = maxJobId; + this.registrationId = nextRegistrationId.getAndIncrement(); } public InetSocketAddress getNodeControllerAddress() { @@ -187,7 +190,7 @@ public final class NodeRegistration implements Serializable { return pid; } - public long getMaxJobId() { - return maxJobId; + public int getRegistrationId() { + return registrationId; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 0fdafe3..ae40ea3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -57,11 +57,9 @@ import org.apache.hyracks.ipc.api.IIPCHandle; public class ClusterControllerRemoteProxy implements IClusterController { - private final CcId ccId; private IIPCHandle ipcHandle; - public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) { - this.ccId = ccId; + public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) { this.ipcHandle = ipcHandle; } @@ -178,12 +176,7 @@ public class ClusterControllerRemoteProxy implements IClusterController { } @Override - public CcId getCcId() { - return ccId; - } - - @Override public String toString() { - return getClass().getSimpleName() + " " + ccId + " [" + ipcHandle.getRemoteAddress() + "]"; + return getClass().getSimpleName() + " [" + ipcHandle.getRemoteAddress() + "]"; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java new file mode 100644 index 0000000..63fffb4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc; + +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.control.common.base.IClusterController; +import org.apache.hyracks.control.common.controllers.NodeParameters; +import org.apache.hyracks.control.common.controllers.NodeRegistration; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CcConnection { + private static final Logger LOGGER = LogManager.getLogger(); + + private final IClusterController ccs; + private boolean registrationPending; + private Exception registrationException; + private NodeParameters nodeParameters; + + CcConnection(IClusterController ccs) { + this.ccs = ccs; + } + + @Override + public String toString() { + return ccs.toString(); + } + + public CcId getCcId() { + return getNodeParameters().getClusterControllerInfo().getCcId(); + } + + synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { + nodeParameters = parameters; + registrationException = exception; + registrationPending = false; + notifyAll(); + } + + public synchronized CcId registerNode(NodeRegistration nodeRegistration) throws Exception { + registrationPending = true; + ccs.registerNode(nodeRegistration); + while (registrationPending) { + wait(); + } + if (registrationException != null) { + LOGGER.log(Level.WARN, "Registering with {} failed with exception", this, registrationException); + throw registrationException; + } + return getCcId(); + } + + public IClusterController getClusterControllerService() { + return ccs; + } + + public NodeParameters getNodeParameters() { + return nodeParameters; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d753479f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java index ec8cf27..a03e0ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java @@ -53,7 +53,7 @@ public class NCDriver { LOGGER.log(Level.DEBUG, "Exception parsing command line: " + Arrays.toString(args), e); System.exit(2); } catch (Exception e) { - LOGGER.log(Level.DEBUG, "Exiting NCDriver due to exception", e); + LOGGER.error("Exiting NCDriver due to exception", e); System.exit(1); } }
