http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 36ed35d..63178e1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -32,7 +32,6 @@ import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessag import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.AsterixExtension; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.MessagingProperties; import org.apache.asterix.common.config.MetadataProperties; @@ -46,8 +45,6 @@ import org.apache.asterix.common.utils.PrintUtil; import org.apache.asterix.common.utils.Servlets; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.event.schema.cluster.Cluster; -import org.apache.asterix.event.schema.cluster.Node; import org.apache.asterix.messaging.MessagingChannelInterfaceFactory; import org.apache.asterix.messaging.NCMessageBroker; import org.apache.asterix.utils.CompatibilityUtil; @@ -117,8 +114,8 @@ public class NCApplication extends BaseNCApplication { MessagingProperties messagingProperties = runtimeContext.getMessagingProperties(); IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties); this.ncServiceCtx.setMessageBroker(messageBroker); - MessagingChannelInterfaceFactory interfaceFactory = - new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties); + MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory( + (NCMessageBroker) messageBroker, messagingProperties); this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory); final Checkpoint latestCheckpoint = runtimeContext.getTransactionSubsystem().getCheckpointManager().getLatest(); if (latestCheckpoint != null) { @@ -191,8 +188,8 @@ public class NCApplication extends BaseNCApplication { final NodeProperties nodeProperties = runtimeContext.getNodeProperties(); IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); SystemState state = recoveryMgr.getSystemState(); - if (state == SystemState.PERMANENT_DATA_LOSS && (nodeProperties.isInitialRun() || nodeProperties - .isVirtualNc())) { + if (state == SystemState.PERMANENT_DATA_LOSS + && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) { state = SystemState.BOOTSTRAPPING; } // Request registration tasks from CC @@ -218,8 +215,8 @@ public class NCApplication extends BaseNCApplication { StorageProperties storageProperties = runtimeContext.getStorageProperties(); // Deducts the reserved buffer cache size and memory component size from the maxium heap size, // and deducts one core for processing heartbeats. - long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize() - storageProperties - .getMemoryComponentGlobalBudget(); + long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize() + - storageProperties.getMemoryComponentGlobalBudget(); int allCores = Runtime.getRuntime().availableProcessors(); int maximumCoresForComputation = allCores > 1 ? allCores - 1 : allCores; return new NodeCapacity(memorySize, maximumCoresForComputation); @@ -237,54 +234,8 @@ public class NCApplication extends BaseNCApplication { private void updateOnNodeJoin() { MetadataProperties metadataProperties = runtimeContext.getMetadataProperties(); if (!metadataProperties.getNodeNames().contains(nodeId)) { - Cluster cluster = ClusterProperties.INSTANCE.getCluster(); - if (cluster == null) { - throw new IllegalStateException("No cluster configuration found for this instance"); - } NCConfig ncConfig = ((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration(); ncConfig.getConfigManager().ensureNode(nodeId); - String asterixInstanceName = metadataProperties.getInstanceName(); - TransactionProperties txnProperties = runtimeContext.getTransactionProperties(); - Node self = null; - List<Node> nodes; - if (cluster.getSubstituteNodes() != null) { - nodes = cluster.getSubstituteNodes().getNode(); - } else { - throw new IllegalStateException("Unknown node joining the cluster"); - } - for (Node node : nodes) { - String ncId = asterixInstanceName + "_" + node.getId(); - if (ncId.equalsIgnoreCase(nodeId)) { - String storeDir = StorageConstants.STORAGE_ROOT_DIR_NAME; - String nodeIoDevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices(); - String[] ioDevicePaths = nodeIoDevices.trim().split(","); - for (int i = 0; i < ioDevicePaths.length; i++) { - // construct full store path - ioDevicePaths[i] += File.separator + storeDir; - } - metadataProperties.getStores().put(nodeId, ioDevicePaths); - - String coredumpPath = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir(); - metadataProperties.getCoredumpPaths().put(nodeId, coredumpPath); - - String txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir(); - txnProperties.getLogDirectories().put(nodeId, txnLogDir); - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Store set to : " + storeDir); - LOGGER.info("Coredump dir set to : " + coredumpPath); - LOGGER.info("Transaction log dir set to :" + txnLogDir); - } - self = node; - break; - } - } - if (self != null) { - cluster.getSubstituteNodes().getNode().remove(self); - cluster.getNode().add(self); - } else { - throw new IllegalStateException("Unknown node joining the cluster"); - } } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java index 9887c57..64e9810 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java @@ -20,7 +20,6 @@ package org.apache.asterix.util; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -49,16 +48,18 @@ public class FaultToleranceUtil { List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream() .map(Replica::getId).collect(Collectors.toList()); String nodeIdAddress = StringUtils.EMPTY; - Map<String, Map<IOption, Object>> ncConfiguration = clusterManager.getNcConfiguration(); + int nodePort = -1; + Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration(); + // In case the node joined with a new IP address, we need to send it to the other replicas if (event == ClusterEventType.NODE_JOIN) { - nodeIdAddress = (String) ncConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS); + nodeIdAddress = (String) activeNcConfiguration.get(nodeId).get(NCConfig.Option.REPLICATION_PUBLIC_ADDRESS); + nodePort = (int) activeNcConfiguration.get(nodeId).get(NCConfig.Option.REPLICATION_PUBLIC_PORT); } - final Set<String> participantNodes = clusterManager.getParticipantNodes(); - ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event); + ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, nodePort, event); for (String replica : primaryRemoteReplicas) { // If the remote replica is alive, send the event - if (participantNodes.contains(replica)) { + if (activeNcConfiguration.containsKey(replica)) { try { messageBroker.sendApplicationMessageToNC(msg, replica); } catch (Exception e) { @@ -69,4 +70,4 @@ public class FaultToleranceUtil { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/resources/cc-rep.conf ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/resources/cc-rep.conf b/asterixdb/asterix-app/src/main/resources/cc-rep.conf new file mode 100644 index 0000000..9c093dc --- /dev/null +++ b/asterixdb/asterix-app/src/main/resources/cc-rep.conf @@ -0,0 +1,52 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +[nc/asterix_nc1] +txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog +core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump +iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006 +replication.listen.port=2001 +nc.api.port=19004 + +[nc/asterix_nc2] +ncservice.port=9091 +txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog +core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump +iodevices=../asterix-server/target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007 +replication.listen.port=2002 +nc.api.port=19005 + +[nc] +address=127.0.0.1 +command=asterixnc +app.class=org.apache.asterix.hyracks.bootstrap.NCApplication +jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory" +storage.subdir=test_storage +storage.memorycomponent.globalbudget = 1073741824 + +[cc] +address = 127.0.0.1 +app.class=org.apache.asterix.hyracks.bootstrap.CCApplication +heartbeat.period=2000 + +[common] +log.level = INFO +replication.enabled=true +replication.strategy=metadata_only +replication.factor=2 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/resources/cc.conf ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf new file mode 100644 index 0000000..57260a4 --- /dev/null +++ b/asterixdb/asterix-app/src/main/resources/cc.conf @@ -0,0 +1,56 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +[nc/asterix_nc1] +txn.log.dir=target/tmp/asterix_nc1/txnlog +core.dump.dir=target/tmp/asterix_nc1/coredump +iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2 +nc.api.port=19004 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006 + +[nc/asterix_nc2] +ncservice.port=9091 +txn.log.dir=target/tmp/asterix_nc2/txnlog +core.dump.dir=target/tmp/asterix_nc2/coredump +iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2 +nc.api.port=19005 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007 + +[nc] +address=127.0.0.1 +command=asterixnc +app.class=org.apache.asterix.hyracks.bootstrap.NCApplication +jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory" +storage.buffercache.pagesize=32KB +storage.buffercache.size=48MB +storage.memorycomponent.numpages=16 +storage.memorycomponent.globalbudget=512MB + +[cc] +address = 127.0.0.1 +app.class=org.apache.asterix.hyracks.bootstrap.CCApplication +heartbeat.period=2000 + +[common] +log.level = INFO +compiler.framesize=32KB +compiler.sortmemory=320KB +compiler.groupmemory=160KB +compiler.joinmemory=256KB +messaging.frame.size=4096 +messaging.frame.count=512 + http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/resources/cc2.conf ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/resources/cc2.conf b/asterixdb/asterix-app/src/main/resources/cc2.conf new file mode 100644 index 0000000..41beda7 --- /dev/null +++ b/asterixdb/asterix-app/src/main/resources/cc2.conf @@ -0,0 +1,56 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +[nc/asterix_nc1] +txn.log.dir=target/tmp/asterix_nc1/txnlog +core.dump.dir=target/tmp/asterix_nc1/coredump +iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2 +nc.api.port=19004 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006 + +[nc/asterix_nc2] +ncservice.port=9091 +txn.log.dir=target/tmp/asterix_nc2/txnlog +core.dump.dir=target/tmp/asterix_nc2/coredump +iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2 +nc.api.port=19005 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007 + +[nc] +address=127.0.0.1 +command=asterixnc +app.class=org.apache.asterix.hyracks.bootstrap.NCApplication +jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory" +storage.buffercache.pagesize=32KB +storage.buffercache.size=48MB +storage.memorycomponent.numpages=16 +storage.memorycomponent.globalbudget=512MB + +[cc] +address = 127.0.0.1 +app.class=org.apache.asterix.hyracks.bootstrap.CCApplication +heartbeat.period=2000 + +[common] +log.level = WARNING +compiler.framesize=32KB +compiler.sortmemory=320KB +compiler.groupmemory=160KB +compiler.joinmemory=256KB +compiler.parallelism=-1 +messaging.frame.size=4096 +messaging.frame.count=512 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/resources/cc3.conf ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/resources/cc3.conf b/asterixdb/asterix-app/src/main/resources/cc3.conf new file mode 100644 index 0000000..4c38081 --- /dev/null +++ b/asterixdb/asterix-app/src/main/resources/cc3.conf @@ -0,0 +1,56 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +[nc/asterix_nc1] +txn.log.dir=target/tmp/asterix_nc1/txnlog +core.dump.dir=target/tmp/asterix_nc1/coredump +iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2 +nc.api.port=19004 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006 + +[nc/asterix_nc2] +ncservice.port=9091 +txn.log.dir=target/tmp/asterix_nc2/txnlog +core.dump.dir=target/tmp/asterix_nc2/coredump +iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2 +nc.api.port=19005 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007 + +[nc] +address=127.0.0.1 +command=asterixnc +app.class=org.apache.asterix.hyracks.bootstrap.NCApplication +jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory" +storage.buffercache.pagesize=32KB +storage.buffercache.size=48MB +storage.memorycomponent.numpages=16 +storage.memorycomponent.globalbudget=512MB + +[cc] +address = 127.0.0.1 +app.class=org.apache.asterix.hyracks.bootstrap.CCApplication +heartbeat.period=2000 + +[common] +log.level = WARNING +compiler.framesize=32KB +compiler.sortmemory=320KB +compiler.groupmemory=160KB +compiler.joinmemory=256KB +compiler.parallelism=3 +messaging.frame.size=4096 +messaging.frame.count=512 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/resources/cc4.conf ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/resources/cc4.conf b/asterixdb/asterix-app/src/main/resources/cc4.conf new file mode 100644 index 0000000..7ed2d20 --- /dev/null +++ b/asterixdb/asterix-app/src/main/resources/cc4.conf @@ -0,0 +1,53 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +[nc/asterix_nc1] +txn.log.dir=target/tmp/asterix_nc1/txnlog +core.dump.dir=target/tmp/asterix_nc1/coredump +iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2 +nc.api.port=19004 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006 + +[nc/asterix_nc2] +ncservice.port=9091 +txn.log.dir=target/tmp/asterix_nc2/txnlog +core.dump.dir=target/tmp/asterix_nc2/coredump +iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2 +nc.api.port=19005 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007 + +[nc] +address=127.0.0.1 +command=asterixnc +app.class=org.apache.asterix.hyracks.bootstrap.NCApplication +jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory" +storage.buffercache.pagesize=32KB + +[cc] +address = 127.0.0.1 +app.class=org.apache.asterix.hyracks.bootstrap.CCApplication +heartbeat.period=2000 + +[common] +log.level = INFO +compiler.framesize=32KB +compiler.sortmemory=320KB +compiler.groupmemory=160KB +compiler.joinmemory=256KB +messaging.frame.size=4096 +messaging.frame.count=512 +compiler.parallelism=-1 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index a1c2ee6..fddab14 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.logging.Logger; import org.apache.asterix.app.external.ExternalUDFLibrarian; @@ -66,6 +67,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; import org.apache.asterix.transaction.management.runtime.CommitRuntime; import org.apache.asterix.transaction.management.service.logging.LogReader; +import org.apache.avro.generic.GenericData; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; @@ -74,6 +76,7 @@ import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; @@ -117,7 +120,7 @@ public class TestNodeController { protected static final String PATH_ACTUAL = "unittest" + File.separator; protected static final String PATH_BASE = FileUtil.joinPath("src", "test", "resources", "nodetests"); - protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; + protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; protected static TransactionProperties txnProperties; private static final boolean cleanupOnStart = true; private static final boolean cleanupOnStop = true; @@ -132,6 +135,7 @@ public class TestNodeController { private long jobCounter = 100L; private final String testConfigFileName; private final boolean runHDFS; + private final List<Pair<IOption, Object>> options = new ArrayList<>(); public TestNodeController(String testConfigFileName, boolean runHDFS) { this.testConfigFileName = testConfigFileName; @@ -146,7 +150,7 @@ public class TestNodeController { ExternalUDFLibrarian.removeLibraryDir(); ExecutionTestUtil.setUp(cleanupOnStart, testConfigFileName == null ? TEST_CONFIG_FILE_NAME : testConfigFileName, - ExecutionTestUtil.integrationUtil, runHDFS); + ExecutionTestUtil.integrationUtil, runHDFS, options); } catch (Throwable th) { th.printStackTrace(); throw th; @@ -158,6 +162,10 @@ public class TestNodeController { ExecutionTestUtil.tearDown(cleanupOnStop); } + public void setOpts(List<Pair<IOption, Object>> opts) { + options.addAll(opts); + } + public TxnId getTxnJobId(IHyracksTaskContext ctx) { return getTxnJobId(ctx.getJobletContext().getJobId()); } @@ -171,30 +179,29 @@ public class TestNodeController { int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, StorageComponentProvider storageComponentProvider, Index secondaryIndex) throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { - CcApplicationContext appCtx = - (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); + CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc + .getApplicationContext(); MetadataProvider mdProvider = new MetadataProvider(appCtx, null); try { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = DatasetUtil + .getMergePolicyFactory(dataset, mdTxnCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators); IndexOperation op = IndexOperation.INSERT; - IModificationOperationCallbackFactory modOpCallbackFactory = - new PrimaryIndexModificationOperationCallbackFactory(dataset.getDatasetId(), - primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op), - ResourceType.LSM_BTREE); + IModificationOperationCallbackFactory modOpCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory( + dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, + Operation.get(op), ResourceType.LSM_BTREE); IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); - RecordDescriptor recordDesc = - recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0); + RecordDescriptor recordDesc = recordDescProvider + .getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0); IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); - LSMInsertDeleteOperatorNodePushable insertOp = - new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(), - primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, op, true, - indexHelperFactory, modOpCallbackFactory, null); + LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(ctx, + ctx.getTaskAttemptId().getTaskId().getPartition(), + primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, op, true, indexHelperFactory, + modOpCallbackFactory, null); // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta> // for the index, we will have to create an assign operator that extract the sk @@ -202,8 +209,8 @@ public class TestNodeController { if (secondaryIndex != null) { List<List<String>> skNames = secondaryIndex.getKeyFieldNames(); List<Integer> indicators = secondaryIndex.getKeyFieldSourceIndicators(); - IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories = - new IScalarEvaluatorFactory[skNames.size()]; + IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[skNames + .size()]; for (int i = 0; i < skNames.size(); i++) { ARecordType sourceType = dataset.hasMetaPart() ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordType : metaType @@ -226,18 +233,17 @@ public class TestNodeController { for (int i = 0; i < primaryIndexInfo.index.getKeyFieldNames().size(); i++) { projectionList[projCount++] = i; } - IPushRuntime assignOp = - new AssignRuntimeFactory(outColumns, secondaryFieldAccessEvalFactories, projectionList, true) - .createPushRuntime(ctx); + IPushRuntime assignOp = new AssignRuntimeFactory(outColumns, secondaryFieldAccessEvalFactories, + projectionList, true).createPushRuntime(ctx); insertOp.setOutputFrameWriter(0, assignOp, primaryIndexInfo.rDesc); assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex); IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider); - LSMInsertDeleteOperatorNodePushable secondaryInsertOp = - new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(), - secondaryIndexInfo.insertFieldsPermutations, secondaryIndexInfo.rDesc, op, false, - secondaryIndexHelperFactory, NoOpOperationCallbackFactory.INSTANCE, null); + LSMInsertDeleteOperatorNodePushable secondaryInsertOp = new LSMInsertDeleteOperatorNodePushable(ctx, + ctx.getTaskAttemptId().getTaskId().getPartition(), secondaryIndexInfo.insertFieldsPermutations, + secondaryIndexInfo.rDesc, op, false, secondaryIndexHelperFactory, + NoOpOperationCallbackFactory.INSTANCE, null); assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc); CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), secondaryIndexInfo.primaryKeyIndexes, true, ctx.getTaskAttemptId().getTaskId().getPartition(), @@ -272,9 +278,9 @@ public class TestNodeController { BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc, null, null, true, true, indexDataflowHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false); - BTreeSearchOperatorNodePushable searchOp = - searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), - ctx.getTaskAttemptId().getTaskId().getPartition(), 1); + BTreeSearchOperatorNodePushable searchOp = searchOpDesc.createPushRuntime(ctx, + primaryIndexInfo.getSearchRecordDescriptorProvider(), ctx.getTaskAttemptId().getTaskId().getPartition(), + 1); emptyTupleOp.setOutputFrameWriter(0, searchOp, primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0)); searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc); @@ -294,8 +300,8 @@ public class TestNodeController { Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP); Index index = primaryIndexInfo.getIndex(); - CcApplicationContext appCtx = - (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); + CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc + .getApplicationContext(); MetadataProvider mdProvider = new MetadataProvider(appCtx, dataverse); try { return dataset.getResourceFactory(mdProvider, index, primaryIndexInfo.recordType, primaryIndexInfo.metaType, @@ -310,8 +316,8 @@ public class TestNodeController { int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, int partition) throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = DatasetUtil + .getMergePolicyFactory(dataset, mdTxnCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators); @@ -322,9 +328,9 @@ public class TestNodeController { try { IResourceFactory resourceFactory = dataset.getResourceFactory(mdProvider, primaryIndexInfo.index, recordType, metaType, mergePolicy.first, mergePolicy.second); - IndexBuilderFactory indexBuilderFactory = - new IndexBuilderFactory(storageComponentProvider.getStorageManager(), - primaryIndexInfo.getFileSplitProvider(), resourceFactory, true); + IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory( + storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider(), + resourceFactory, true); IHyracksTaskContext ctx = createTestContext(newJobId(), partition, false); IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition); indexBuilder.build(); @@ -338,8 +344,8 @@ public class TestNodeController { IStorageComponentProvider storageComponentProvider, int partition) throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = - DatasetUtil.getMergePolicyFactory(primaryIndexInfo.dataset, mdTxnCtx); + org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = DatasetUtil + .getMergePolicyFactory(primaryIndexInfo.dataset, mdTxnCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); Dataverse dataverse = new Dataverse(primaryIndexInfo.dataset.getDataverseName(), NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP); @@ -350,9 +356,9 @@ public class TestNodeController { IResourceFactory resourceFactory = primaryIndexInfo.dataset.getResourceFactory(mdProvider, secondaryIndex, primaryIndexInfo.recordType, primaryIndexInfo.metaType, mergePolicy.first, mergePolicy.second); - IndexBuilderFactory indexBuilderFactory = - new IndexBuilderFactory(storageComponentProvider.getStorageManager(), - secondaryIndexInfo.fileSplitProvider, resourceFactory, true); + IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory( + storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider, resourceFactory, + true); IHyracksTaskContext ctx = createTestContext(newJobId(), partition, false); IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition); indexBuilder.build(); @@ -367,8 +373,8 @@ public class TestNodeController { int i = 0; ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields]; for (; i < primaryKeyTypes.length; i++) { - primaryIndexSerdes[i] = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]); + primaryIndexSerdes[i] = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(primaryKeyTypes[i]); } primaryIndexSerdes[i++] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType); if (metaType != null) { @@ -379,16 +385,16 @@ public class TestNodeController { public static ISerializerDeserializer<?>[] createSecondaryIndexSerdes(ARecordType recordType, ARecordType metaType, IAType[] primaryKeyTypes, IAType[] secondaryKeyTypes) { - ISerializerDeserializer<?>[] secondaryIndexSerdes = - new ISerializerDeserializer<?>[secondaryKeyTypes.length + primaryKeyTypes.length]; + ISerializerDeserializer<?>[] secondaryIndexSerdes = new ISerializerDeserializer<?>[secondaryKeyTypes.length + + primaryKeyTypes.length]; int i = 0; for (; i < secondaryKeyTypes.length; i++) { - secondaryIndexSerdes[i] = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(secondaryKeyTypes[i]); + secondaryIndexSerdes[i] = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(secondaryKeyTypes[i]); } for (; i < primaryKeyTypes.length; i++) { - secondaryIndexSerdes[i] = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]); + secondaryIndexSerdes[i] = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(primaryKeyTypes[i]); } return secondaryIndexSerdes; } @@ -434,8 +440,8 @@ public class TestNodeController { ctx = Mockito.spy(ctx); Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx); Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager()); - TaskAttemptId taskId = - new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), partition), 0); + TaskAttemptId taskId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), partition), + 0); Mockito.when(ctx.getTaskAttemptId()).thenReturn(taskId); return ctx; } @@ -469,8 +475,8 @@ public class TestNodeController { this.primaryIndexInfo = primaryIndexInfo; this.secondaryIndex = secondaryIndex; List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId()); - CcApplicationContext appCtx = - (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); + CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc + .getApplicationContext(); FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), primaryIndexInfo.dataset, secondaryIndex.getIndexName(), nodes); fileSplitProvider = new ConstantFileSplitProvider(splits); @@ -524,10 +530,10 @@ public class TestNodeController { this.mergePolicyProperties = mergePolicyProperties; this.primaryKeyIndexes = primaryKeyIndexes; primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1)); - primaryIndexTypeTraits = - createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType); - primaryIndexSerdes = - createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType); + primaryIndexTypeTraits = createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, + recordType, metaType); + primaryIndexSerdes = createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, + metaType); rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits); primaryIndexInsertFieldsPermutations = new int[primaryIndexNumOfTupleFields]; for (int i = 0; i < primaryIndexNumOfTupleFields; i++) { @@ -537,16 +543,16 @@ public class TestNodeController { List<IAType> keyFieldTypes = Arrays.asList(primaryKeyTypes); for (int i = 0; i < primaryKeyIndicators.size(); i++) { Integer indicator = primaryKeyIndicators.get(i); - String[] fieldNames = - indicator == Index.RECORD_INDICATOR ? recordType.getFieldNames() : metaType.getFieldNames(); + String[] fieldNames = indicator == Index.RECORD_INDICATOR ? recordType.getFieldNames() + : metaType.getFieldNames(); keyFieldNames.add(Arrays.asList(fieldNames[primaryKeyIndexes[i]])); } index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(), IndexType.BTREE, keyFieldNames, primaryKeyIndicators, keyFieldTypes, false, false, true, MetadataUtil.PENDING_NO_OP); List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId()); - CcApplicationContext appCtx = - (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); + CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc + .getApplicationContext(); FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset, index.getIndexName(), nodes); fileSplitProvider = new ConstantFileSplitProvider(splits); @@ -567,8 +573,8 @@ public class TestNodeController { ISerializerDeserializer<?>[] primaryKeySerdes = new ISerializerDeserializer<?>[primaryKeyTypes.length]; for (int i = 0; i < primaryKeyTypes.length; i++) { primaryKeyTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]); - primaryKeySerdes[i] = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]); + primaryKeySerdes[i] = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(primaryKeyTypes[i]); } RecordDescriptor searcgRecDesc = new RecordDescriptor(primaryKeySerdes, primaryKeyTypeTraits); IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class); @@ -584,10 +590,10 @@ public class TestNodeController { public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) { int primaryIndexNumOfTupleFields = keyTypes.length + (1 + ((metaType == null) ? 0 : 1)); - ITypeTraits[] primaryIndexTypeTraits = - createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType); - ISerializerDeserializer<?>[] primaryIndexSerdes = - createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType); + ITypeTraits[] primaryIndexTypeTraits = createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes, + recordType, metaType); + ISerializerDeserializer<?>[] primaryIndexSerdes = createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, + keyTypes, recordType, metaType); return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits); } @@ -598,8 +604,8 @@ public class TestNodeController { } public IStorageManager getStorageManager() { - CcApplicationContext appCtx = - (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); + CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc + .getApplicationContext(); return appCtx.getStorageManager(); } @@ -608,8 +614,8 @@ public class TestNodeController { int[] keyIndexes, List<Integer> keyIndicators, StorageComponentProvider storageComponentProvider, IFrameOperationCallbackFactory frameOpCallbackFactory, boolean hasSecondaries) throws Exception { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = DatasetUtil + .getMergePolicyFactory(dataset, mdTxnCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, keyTypes, recordType, metaType, mergePolicy.first, mergePolicy.second, filterFields, keyIndexes, keyIndicators); @@ -620,13 +626,13 @@ public class TestNodeController { IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); - LSMPrimaryUpsertOperatorNodePushable insertOp = - new LSMPrimaryUpsertOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(), - indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations, - recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), - modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1, - frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory, - MissingWriterFactory.INSTANCE, hasSecondaries); + LSMPrimaryUpsertOperatorNodePushable insertOp = new LSMPrimaryUpsertOperatorNodePushable(ctx, + ctx.getTaskAttemptId().getTaskId().getPartition(), indexHelperFactory, + primaryIndexInfo.primaryIndexInsertFieldsPermutations, + recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), + modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1, + frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory, + MissingWriterFactory.INSTANCE, hasSecondaries); RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset, filterFields == null ? 0 : filterFields.length, recordType, metaType); // fix pk fields @@ -644,8 +650,8 @@ public class TestNodeController { private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields, ARecordType itemType, ARecordType metaItemType) throws Exception { - ITypeTraits[] outputTypeTraits = - new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; + ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount() + + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; @@ -683,4 +689,4 @@ public class TestNodeController { } return new RecordDescriptor(outputSerDes, outputTypeTraits); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java index b80f8f8..51c512a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java @@ -29,11 +29,8 @@ import java.util.Arrays; import java.util.List; import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.compiler.provider.AqlCompilationProvider; -import org.apache.asterix.event.schema.cluster.Cluster; -import org.apache.asterix.event.schema.cluster.MasterNode; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.statement.RunStatement; @@ -41,7 +38,9 @@ import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.context.ICCContext; import org.apache.hyracks.control.common.controllers.CCConfig; import org.junit.Assert; import org.junit.Test; @@ -64,19 +63,15 @@ public class QueryTranslatorTest { when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002); ICCServiceContext mockServiceContext = mock(ICCServiceContext.class); when(mockAsterixAppContextInfo.getServiceContext()).thenReturn(mockServiceContext); + ICCContext mockCCContext = mock(ICCContext.class); + when(mockServiceContext.getCCContext()).thenReturn(mockCCContext); + ClusterControllerInfo mockCCInfo = mock(ClusterControllerInfo.class); + when(mockCCContext.getClusterControllerInfo()).thenReturn(mockCCInfo); + when(mockCCInfo.getClientNetAddress()).thenReturn("127.0.0.1"); IApplicationConfig mockApplicationConfig = mock(IApplicationConfig.class); when(mockServiceContext.getAppConfig()).thenReturn(mockApplicationConfig); when(mockApplicationConfig.getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)).thenReturn(true); - // Mocks AsterixClusterProperties. - Cluster mockCluster = mock(Cluster.class); - MasterNode mockMasterNode = mock(MasterNode.class); - ClusterProperties mockClusterProperties = mock(ClusterProperties.class); - setFinalStaticField(ClusterProperties.class.getDeclaredField("INSTANCE"), mockClusterProperties); - when(mockClusterProperties.getCluster()).thenReturn(mockCluster); - when(mockCluster.getMasterNode()).thenReturn(mockMasterNode); - when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1"); - IStatementExecutor aqlTranslator = new DefaultStatementExecutorFactory().create(mockAsterixAppContextInfo, statements, mockSessionOutput, new AqlCompilationProvider(), new StorageComponentProvider()); List<String> parameters = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index 1466088..07d3584 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -30,7 +30,6 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.MetadataProperties; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.asterix.runtime.utils.CcApplicationContext; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index e1fdb69..df7756b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -60,26 +60,27 @@ public class ActiveStatsTest { protected boolean cleanUp = true; private static String EXPECTED_STATS = "\"Mock stats\""; + private static String CONF_PATH = "src/main/resources/cc.conf"; @Before public void setUp() throws Exception { - ExecutionTestUtil.setUp(cleanUp); + ExecutionTestUtil.setUp(cleanUp, CONF_PATH); } @Test public void refreshStatsTest() throws Exception { // Entities to be used EntityId entityId = new EntityId("MockExtension", "MockDataverse", "MockEntity"); - ActiveRuntimeId activeRuntimeId = - new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0); + ActiveRuntimeId activeRuntimeId = new ActiveRuntimeId(entityId, + FeedIntakeOperatorNodePushable.class.getSimpleName(), 0); List<Dataset> datasetList = new ArrayList<>(); - AlgebricksAbsolutePartitionConstraint partitionConstraint = - new AlgebricksAbsolutePartitionConstraint(new String[] { "asterix_nc1" }); + AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint( + new String[] { "asterix_nc1" }); String requestedStats; - CcApplicationContext appCtx = - (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); - ActiveNotificationHandler activeJobNotificationHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc + .getApplicationContext(); + ActiveNotificationHandler activeJobNotificationHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); JobId jobId = new JobId(1); // Mock ActiveRuntime @@ -103,8 +104,8 @@ public class ActiveStatsTest { entityId, datasetList, partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName(), NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList()); // Register mock runtime - NCAppRuntimeContext nc1AppCtx = - (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext(); + NCAppRuntimeContext nc1AppCtx = (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0] + .getApplicationContext(); nc1AppCtx.getActiveManager().registerRuntime(mockRuntime); // Check init stats @@ -115,8 +116,8 @@ public class ActiveStatsTest { eventsListener.refreshStats(1000); requestedStats = eventsListener.getStats(); Assert.assertTrue(requestedStats.contains("N/A")); - WaitForStateSubscriber startingSubscriber = - new WaitForStateSubscriber(eventsListener, Collections.singleton(ActivityState.STARTING)); + WaitForStateSubscriber startingSubscriber = new WaitForStateSubscriber(eventsListener, + Collections.singleton(ActivityState.STARTING)); // Update stats of created/started job without joined partition TestUserActor user = new TestUserActor("Xikui", mdProvider, null); Action start = user.startActivity(eventsListener); @@ -127,8 +128,8 @@ public class ActiveStatsTest { requestedStats = eventsListener.getStats(); Assert.assertTrue(requestedStats.contains("N/A")); // Fake partition message and notify eventListener - ActivePartitionMessage partitionMessage = - new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null); + ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId, + Event.RUNTIME_REGISTERED, null); partitionMessage.handle(appCtx); start.sync(); if (start.hasFailed()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index d840daf..eaed8a6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -72,6 +72,7 @@ import org.apache.asterix.testframework.xml.ComparisonEnum; import org.apache.asterix.testframework.xml.TestCase.CompilationUnit; import org.apache.asterix.testframework.xml.TestCase.CompilationUnit.Parameter; import org.apache.asterix.testframework.xml.TestGroup; +import org.apache.avro.generic.GenericData; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.ByteArrayOutputStream; @@ -108,13 +109,13 @@ public class TestExecutor { // see // https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers/417184 private static final long MAX_URL_LENGTH = 2000l; - private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = - Pattern.compile("/\\*.*\\*/", Pattern.MULTILINE | Pattern.DOTALL); - private static final Pattern JAVA_SHELL_SQL_LINE_COMMENT_PATTERN = - Pattern.compile("^(//|#|--).*$", Pattern.MULTILINE); + private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = Pattern.compile("/\\*.*\\*/", + Pattern.MULTILINE | Pattern.DOTALL); + private static final Pattern JAVA_SHELL_SQL_LINE_COMMENT_PATTERN = Pattern.compile("^(//|#|--).*$", + Pattern.MULTILINE); private static final Pattern REGEX_LINES_PATTERN = Pattern.compile("^(-)?/(.*)/([im]*)$"); - private static final Pattern POLL_TIMEOUT_PATTERN = - Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", Pattern.MULTILINE); + private static final Pattern POLL_TIMEOUT_PATTERN = Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", + Pattern.MULTILINE); private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE); private static final Pattern HANDLE_VARIABLE_PATTERN = Pattern.compile("handlevariable=(\\w+)"); private static final Pattern VARIABLE_REF_PATTERN = Pattern.compile("\\$(\\w+)"); @@ -128,7 +129,6 @@ public class TestExecutor { public static final String DELIVERY_IMMEDIATE = "immediate"; private static final String METRICS_QUERY_TYPE = "metrics"; - private static Method managixExecuteMethod = null; private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>(); private static Map<String, InetSocketAddress> ncEndPoints; private static Map<String, InetSocketAddress> replicationAddress; @@ -186,10 +186,10 @@ public class TestExecutor { public void runScriptAndCompareWithResult(File scriptFile, PrintWriter print, File expectedFile, File actualFile, ComparisonEnum compare) throws Exception { System.err.println("Expected results file: " + expectedFile.toString()); - BufferedReader readerExpected = - new BufferedReader(new InputStreamReader(new FileInputStream(expectedFile), "UTF-8")); - BufferedReader readerActual = - new BufferedReader(new InputStreamReader(new FileInputStream(actualFile), "UTF-8")); + BufferedReader readerExpected = new BufferedReader( + new InputStreamReader(new FileInputStream(expectedFile), "UTF-8")); + BufferedReader readerActual = new BufferedReader( + new InputStreamReader(new FileInputStream(actualFile), "UTF-8")); boolean regex = false; try { if (ComparisonEnum.BINARY.equals(compare)) { @@ -372,10 +372,10 @@ public class TestExecutor { public void runScriptAndCompareWithResultRegex(File scriptFile, File expectedFile, File actualFile) throws Exception { String lineExpected, lineActual; - try (BufferedReader readerExpected = - new BufferedReader(new InputStreamReader(new FileInputStream(expectedFile), "UTF-8")); - BufferedReader readerActual = - new BufferedReader(new InputStreamReader(new FileInputStream(actualFile), "UTF-8"))) { + try (BufferedReader readerExpected = new BufferedReader( + new InputStreamReader(new FileInputStream(expectedFile), "UTF-8")); + BufferedReader readerActual = new BufferedReader( + new InputStreamReader(new FileInputStream(actualFile), "UTF-8"))) { StringBuilder actual = new StringBuilder(); while ((lineActual = readerActual.readLine()) != null) { actual.append(lineActual).append('\n'); @@ -715,8 +715,8 @@ public class TestExecutor { // Insert and Delete statements are executed here public void executeUpdate(String str, URI uri) throws Exception { // Create a method instance. - HttpUriRequest request = - RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)).build(); + HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)) + .build(); // Execute the method. executeAndCheckHttpRequest(request); @@ -726,10 +726,10 @@ public class TestExecutor { public InputStream executeAnyAQLAsync(String statement, boolean defer, OutputFormat fmt, URI uri, Map<String, Object> variableCtx) throws Exception { // Create a method instance. - HttpUriRequest request = - RequestBuilder.post(uri).addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous") - .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)) - .setHeader("Accept", fmt.mimeType()).build(); + HttpUriRequest request = RequestBuilder.post(uri) + .addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous") + .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType()) + .build(); String handleVar = getHandleVariable(statement); @@ -755,8 +755,8 @@ public class TestExecutor { // create function statement public void executeDDL(String str, URI uri) throws Exception { // Create a method instance. - HttpUriRequest request = - RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)).build(); + HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)) + .build(); // Execute the method. executeAndCheckHttpRequest(request); @@ -766,8 +766,8 @@ public class TestExecutor { // and returns the contents as a string // This string is later passed to REST API for execution. public String readTestFile(File testFile) throws Exception { - BufferedReader reader = - new BufferedReader(new InputStreamReader(new FileInputStream(testFile), StandardCharsets.UTF_8)); + BufferedReader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(testFile), StandardCharsets.UTF_8)); String line; StringBuilder stringBuilder = new StringBuilder(); String ls = System.getProperty("line.separator"); @@ -779,15 +779,6 @@ public class TestExecutor { return stringBuilder.toString(); } - public static void executeManagixCommand(String command) throws ClassNotFoundException, NoSuchMethodException, - SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { - if (managixExecuteMethod == null) { - Class<?> clazz = Class.forName("org.apache.asterix.installer.test.AsterixInstallerIntegrationUtil"); - managixExecuteMethod = clazz.getMethod("executeCommand", String.class); - } - managixExecuteMethod.invoke(null, command); - } - public static String executeScript(ProcessBuilder pb, String scriptPath) throws Exception { LOGGER.info("Executing script: " + scriptPath); pb.command(scriptPath); @@ -795,22 +786,6 @@ public class TestExecutor { return getProcessOutput(p); } - private static String executeVagrantScript(ProcessBuilder pb, String node, String scriptName) throws Exception { - pb.command("vagrant", "ssh", node, "--", pb.environment().get("SCRIPT_HOME") + scriptName); - Process p = pb.start(); - p.waitFor(); - InputStream input = p.getInputStream(); - return IOUtils.toString(input, StandardCharsets.UTF_8.name()); - } - - private static String executeVagrantManagix(ProcessBuilder pb, String command) throws Exception { - pb.command("vagrant", "ssh", "cc", "--", pb.environment().get("MANAGIX_HOME") + command); - Process p = pb.start(); - p.waitFor(); - InputStream input = p.getInputStream(); - return IOUtils.toString(input, StandardCharsets.UTF_8.name()); - } - private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) { String targetWord = "queries" + File.separator; int targetWordSize = targetWord.lastIndexOf(File.separator); @@ -822,8 +797,8 @@ public class TestExecutor { private static String getProcessOutput(Process p) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Future<Integer> future = - Executors.newSingleThreadExecutor().submit(() -> IOUtils.copy(p.getInputStream(), new OutputStream() { + Future<Integer> future = Executors.newSingleThreadExecutor() + .submit(() -> IOUtils.copy(p.getInputStream(), new OutputStream() { @Override public void write(int b) throws IOException { baos.write(b); @@ -917,10 +892,6 @@ public class TestExecutor { expectedResultFile, actualResultFile, queryCount, expectedResultFileCtxs.size(), cUnit.getParameter(), ComparisonEnum.TEXT); break; - case "mgx": - executeManagixCommand(stripLineComments(statement).trim()); - Thread.sleep(8000); - break; case "txnqbc": // qbc represents query before crash InputStream resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit), getEndpoint(Servlets.AQL_QUERY), cUnit.getParameter()); @@ -979,28 +950,6 @@ public class TestExecutor { } System.err.println("...but that was expected."); break; - case "vscript": // a script that will be executed on a vagrant virtual node - try { - String[] command = stripLineComments(statement).trim().split(" "); - if (command.length != 2) { - throw new Exception("invalid vagrant script format"); - } - String nodeId = command[0]; - String scriptName = command[1]; - String output = executeVagrantScript(pb, nodeId, scriptName); - if (output.contains("ERROR")) { - throw new Exception(output); - } - } catch (Exception e) { - throw new Exception("Test \"" + testFile + "\" FAILED!\n", e); - } - break; - case "vmgx": // a managix command that will be executed on vagrant cc node - String output = executeVagrantManagix(pb, stripLineComments(statement).trim()); - if (output.contains("ERROR")) { - throw new Exception(output); - } - break; case "get": case "post": case "put": @@ -1094,8 +1043,13 @@ public class TestExecutor { command = stripJavaComments(statement).trim().split(" "); String commandType = command[0]; String nodeId = command[1]; - if (commandType.equals("kill")) { - killNC(nodeId, cUnit); + switch (commandType) { + case "kill": + killNC(nodeId, cUnit); + break; + case "start": + startNC(nodeId); + break; } break; case "loop": @@ -1403,9 +1357,9 @@ public class TestExecutor { return executeJSON(fmt, ctxType.toUpperCase(), uri, params, responseCodeValidator); } - private void killNC(String nodeId, CompilationUnit cUnit) throws Exception { + public void killNC(String nodeId, CompilationUnit cUnit) throws Exception { //get node process id - OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit); + OutputFormat fmt = OutputFormat.CLEAN_JSON; String endpoint = "/admin/cluster/node/" + nodeId + "/config"; InputStream executeJSONGet = executeJSONGet(fmt, createEndpointURI(endpoint, null)); StringWriter actual = new StringWriter(); @@ -1421,6 +1375,18 @@ public class TestExecutor { deleteNCTxnLogs(nodeId, cUnit); } + public void startNC(String nodeId) throws Exception { + //get node process id + OutputFormat fmt = OutputFormat.CLEAN_JSON; + String endpoint = "/rest/startnode"; + List<Parameter> params = new ArrayList<>(); + Parameter node = new Parameter(); + node.setName("node"); + node.setValue(nodeId); + params.add(node); + InputStream executeJSON = executeJSON(fmt, "POST", URI.create("http://localhost:16001" + endpoint), params); + } + private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws Exception { OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit); String endpoint = "/admin/cluster/node/" + nodeId + "/config"; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java index df9782a..66a34ff 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java @@ -19,7 +19,6 @@ package org.apache.asterix.test.common; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -29,13 +28,6 @@ import java.util.List; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; - -import org.apache.asterix.common.configuration.AsterixConfiguration; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.io.FileUtils; import org.apache.hyracks.util.file.FileUtil; @@ -54,6 +46,15 @@ public final class TestHelper { return false; } + public static void deleteExistingInstanceFiles() { + for (String dirName : TEST_DIRS) { + File f = new File(FileUtil.joinPath(TEST_DIR_BASE_PATH, dirName)); + if (FileUtils.deleteQuietly(f)) { + System.out.println("Dir " + f.getName() + " deleted"); + } + } + } + public static void unzip(String sourceFile, String outputDir) throws IOException { if (System.getProperty("os.name").toLowerCase().startsWith("win")) { try (ZipFile zipFile = new ZipFile(sourceFile)) { @@ -80,43 +81,4 @@ public final class TestHelper { } } } - - public static AsterixConfiguration getConfigurations(String fileName) - throws IOException, JAXBException, AsterixException { - try (InputStream is = TestHelper.class.getClassLoader().getResourceAsStream(fileName)) { - if (is != null) { - JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class); - Unmarshaller unmarshaller = ctx.createUnmarshaller(); - return (AsterixConfiguration) unmarshaller.unmarshal(is); - } else { - throw new AsterixException("Could not find configuration file " + fileName); - } - } - } - - public static void writeConfigurations(AsterixConfiguration ac, String fileName) - throws FileNotFoundException, IOException, JAXBException { - File configFile = new File(fileName); - if (!configFile.exists()) { - configFile.getParentFile().mkdirs(); - configFile.createNewFile(); - } else { - configFile.delete(); - } - try (FileOutputStream os = new FileOutputStream(fileName)) { - JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class); - Marshaller marshaller = ctx.createMarshaller(); - marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); - marshaller.marshal(ac, os); - } - } - - public static void deleteExistingInstanceFiles() { - for (String dirName : TEST_DIRS) { - File f = new File(FileUtil.joinPath(TEST_DIR_BASE_PATH, dirName)); - if (FileUtils.deleteQuietly(f)) { - System.out.println("Dir " + f.getName() + " deleted"); - } - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index e923d93..fca0848 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -90,8 +90,8 @@ public class ComponentRollbackTest { private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); - private static final GenerationFunction[] RECORD_GEN_FUNCTION = - { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC }; + private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC, + GenerationFunction.DETERMINISTIC }; private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; private static final ARecordType META_TYPE = null; private static final GenerationFunction[] META_GEN_FUNCTION = null; @@ -133,7 +133,7 @@ public class ComponentRollbackTest { System.out.println("SetUp: "); TestHelper.deleteExistingInstanceFiles(); String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test" - + File.separator + "resources" + File.separator + "multi-partition-test-configuration.xml"; + + File.separator + "resources" + File.separator + "cc.conf"; nc = new TestNodeController(configPath, false); nc.init(); ncAppCtx = nc.getAppRuntimeContext(); @@ -152,15 +152,14 @@ public class ComponentRollbackTest { List<List<String>> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); int partition = 0; - dataset = - new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, - NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - partitioningKeys, null, null, null, false, null), - null, DatasetType.INTERNAL, DATASET_ID, 0); + dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, + NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, + partitioningKeys, null, null, null, false, null), + null, DatasetType.INTERNAL, DATASET_ID, 0); PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition); - IndexDataflowHelperFactory iHelperFactory = - new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); + IndexDataflowHelperFactory iHelperFactory = new IndexDataflowHelperFactory(nc.getStorageManager(), + primaryIndexInfo.getFileSplitProvider()); JobId jobId = nc.newJobId(); ctx = nc.createTestContext(jobId, partition, false); indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java index 0a968c8..5e32a10 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java @@ -75,8 +75,8 @@ public class LogMarkerTest { private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); - private static final GenerationFunction[] RECORD_GEN_FUNCTION = - { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC }; + private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC, + GenerationFunction.DETERMINISTIC }; private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; private static final ARecordType META_TYPE = null; private static final GenerationFunction[] META_GEN_FUNCTION = null; @@ -150,10 +150,10 @@ public class LogMarkerTest { } insertOp.close(); nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); - IndexDataflowHelperFactory iHelperFactory = - new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider()); - IIndexDataflowHelper dataflowHelper = - iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0); + IndexDataflowHelperFactory iHelperFactory = new IndexDataflowHelperFactory(nc.getStorageManager(), + indexInfo.getFileSplitProvider()); + IIndexDataflowHelper dataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), + 0); dataflowHelper.open(); LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance(); LongPointable longPointable = LongPointable.FACTORY.createPointable();
