This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new f400d33287 [ASTERIXDB-3144][RT] Implement Static Partitioning
f400d33287 is described below
commit f400d33287536d81408c1475e510c50bdd344c27
Author: Murtadha Hubail <[email protected]>
AuthorDate: Fri May 5 03:31:47 2023 +0300
[ASTERIXDB-3144][RT] Implement Static Partitioning
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Implement static partitioning based on storage/compute
partitions map.
- Fixes for LSMPrimaryInsertOperatorNodePushable state
keeping for working on multiple storage partitions.
Change-Id: Ieca7ffb0f48e16fba4dc5beb0868c1ef8ac9245e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17509
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../asterix/app/cc/CcApplicationContext.java | 2 +-
.../asterix/runtime/ClusterStateManagerTest.java | 11 ++
.../asterix/common/cluster/ComputePartition.java | 37 +++++++
.../common/cluster/SplitComputeLocations.java | 41 ++++++++
.../cluster/StorageComputePartitionsMap.java | 93 +++++++++++++++++
.../asterix/common/utils/StorageConstants.java | 2 +
.../adapter/factory/GenericAdapterFactory.java | 1 +
.../metadata/declared/MetadataProvider.java | 5 +-
.../metadata/utils/DataPartitioningProvider.java | 90 +++++++---------
.../utils/DynamicDataPartitioningProvider.java | 57 ++++++++++
.../utils/StaticDataPartitioningProvider.java | 116 +++++++++++++++++++++
.../LSMPrimaryInsertOperatorNodePushable.java | 38 +++----
.../asterix/runtime/utils/ClusterStateManager.java | 20 +++-
.../PersistentLocalResourceRepository.java | 2 +-
14 files changed, 440 insertions(+), 75 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index a2d99a0332..e4247f0dd9 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -157,7 +157,7 @@ public class CcApplicationContext implements
ICcApplicationContext {
requestTracker = new RequestTracker(this);
configValidator = configValidatorFactory.create();
this.adapterFactoryService = adapterFactoryService;
- dataPartitioningProvider = new DataPartitioningProvider(this);
+ dataPartitioningProvider = DataPartitioningProvider.create(this);
}
@Override
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index a2a3b48393..53b9294e2b 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -33,8 +33,10 @@ import
org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.PartitioningScheme;
import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
@@ -231,6 +233,9 @@ public class ClusterStateManagerTest {
MetadataProperties metadataProperties = mockMetadataProperties();
Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties);
+ StorageProperties storageProperties = mockStorageProperties();
+
Mockito.when(ccApplicationContext.getStorageProperties()).thenReturn(storageProperties);
+
ResourceIdManager resourceIdManager = new ResourceIdManager(csm);
Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager);
@@ -258,6 +263,12 @@ public class ClusterStateManagerTest {
return metadataProperties;
}
+ private StorageProperties mockStorageProperties() {
+ StorageProperties storageProperties =
Mockito.mock(StorageProperties.class);
+
Mockito.when(storageProperties.getPartitioningScheme()).thenReturn(PartitioningScheme.DYNAMIC);
+ return storageProperties;
+ }
+
private NcLocalCounters mockLocalCounters() {
final NcLocalCounters localCounters =
Mockito.mock(NcLocalCounters.class);
Mockito.when(localCounters.getMaxJobId()).thenReturn(1000L);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
new file mode 100644
index 0000000000..1d11c302cc
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ComputePartition {
+ private final String nodeId;
+ private final int id;
+
+ public ComputePartition(int id, String nodeId) {
+ this.id = id;
+ this.nodeId = nodeId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public int getId() {
+ return id;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
new file mode 100644
index 0000000000..b58c39f66f
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitComputeLocations {
+
+ private final IFileSplitProvider splitsProvider;
+ private final AlgebricksPartitionConstraint constraints;
+
+ public SplitComputeLocations(IFileSplitProvider splitsProvider,
AlgebricksPartitionConstraint constraints) {
+ this.splitsProvider = splitsProvider;
+ this.constraints = constraints;
+ }
+
+ public IFileSplitProvider getSplitsProvider() {
+ return splitsProvider;
+ }
+
+ public AlgebricksPartitionConstraint getConstraints() {
+ return constraints;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
new file mode 100644
index 0000000000..874371e09a
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.utils.StorageConstants;
+
+public class StorageComputePartitionsMap {
+
+ private final Map<Integer, ComputePartition> stoToComputeLocation = new
HashMap<>();
+
+ public void addStoragePartition(int stoPart, ComputePartition compute) {
+ stoToComputeLocation.put(stoPart, compute);
+ }
+
+ public int[][] getComputeToStorageMap(boolean metadataDataset) {
+ Map<Integer, List<Integer>> computeToStoragePartitions = new
HashMap<>();
+ if (metadataDataset) {
+ final int computePartitionIdForMetadata = 0;
+ computeToStoragePartitions.put(computePartitionIdForMetadata,
+ Collections.singletonList(computePartitionIdForMetadata));
+ } else {
+ for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
+ ComputePartition computePartition = getComputePartition(i);
+ int computeId = computePartition.getId();
+ List<Integer> storagePartitions =
+ computeToStoragePartitions.computeIfAbsent(computeId,
k -> new ArrayList<>());
+ storagePartitions.add(i);
+ }
+ }
+ int[][] computerToStoArray = new
int[computeToStoragePartitions.size()][];
+ for (Map.Entry<Integer, List<Integer>> integerListEntry :
computeToStoragePartitions.entrySet()) {
+ computerToStoArray[integerListEntry.getKey()] =
+ integerListEntry.getValue().stream().mapToInt(i ->
i).toArray();
+ }
+ return computerToStoArray;
+ }
+
+ public ComputePartition getComputePartition(int storagePartition) {
+ return stoToComputeLocation.get(storagePartition);
+ }
+
+ public static StorageComputePartitionsMap
computePartitionsMap(IClusterStateManager clusterStateManager) {
+ ClusterPartition metadataPartition =
clusterStateManager.getMetadataPartition();
+ Map<Integer, ClusterPartition> clusterPartitions =
clusterStateManager.getClusterPartitions();
+ StorageComputePartitionsMap newMap = new StorageComputePartitionsMap();
+ newMap.addStoragePartition(metadataPartition.getPartitionId(),
+ new ComputePartition(metadataPartition.getPartitionId(),
metadataPartition.getActiveNodeId()));
+ int storagePartitionsPerComputePartition =
StorageConstants.NUM_STORAGE_PARTITIONS / clusterPartitions.size();
+ int storagePartitionId = 0;
+ int lastComputePartition = 1;
+ int remainingStoragePartition =
StorageConstants.NUM_STORAGE_PARTITIONS % clusterPartitions.size();
+ for (Map.Entry<Integer, ClusterPartition> cp :
clusterPartitions.entrySet()) {
+ ClusterPartition clusterPartition = cp.getValue();
+ for (int i = 0; i < storagePartitionsPerComputePartition; i++) {
+ newMap.addStoragePartition(storagePartitionId,
+ new
ComputePartition(clusterPartition.getPartitionId(),
clusterPartition.getActiveNodeId()));
+ storagePartitionId++;
+ }
+ if (lastComputePartition == clusterPartitions.size() &&
remainingStoragePartition != 0) {
+ // assign all remaining partitions to last compute partition
+ for (int k = 0; k < remainingStoragePartition; k++) {
+ newMap.addStoragePartition(storagePartitionId, new
ComputePartition(
+ clusterPartition.getPartitionId(),
clusterPartition.getActiveNodeId()));
+ storagePartitionId++;
+ }
+ }
+ lastComputePartition++;
+ }
+ return newMap;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 2d231d3fb8..c26fe76156 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -46,6 +46,8 @@ public class StorageConstants {
public static final String DEFAULT_COMPACTION_POLICY_NAME =
ConcurrentMergePolicyFactory.NAME;
public static final String DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME
= "correlated-prefix";
public static final Map<String, String>
DEFAULT_COMPACTION_POLICY_PROPERTIES;
+ public static final int METADATA_PARTITION = -1;
+ public static final int NUM_STORAGE_PARTITIONS = 8;
/**
* The storage version of AsterixDB related artifacts (e.g. log files,
checkpoint files, etc..).
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 9ca87b873f..3ac8a0292e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -145,6 +145,7 @@ public class GenericAdapterFactory implements
ITypedAdapterFactory {
throws HyracksDataException, AlgebricksException {
this.isFeed = ExternalDataUtils.isFeed(configuration);
if (isFeed) {
+ //TODO(partitioning) make this code reuse DataPartitioningProvider
feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx,
ExternalDataUtils.getDatasetDataverse(configuration),
ExternalDataUtils.getFeedName(configuration),
dataSourceFactory.getPartitionConstraint());
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index a8df92aa2e..0b011d0c0d 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -933,7 +933,8 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx,
Dataset dataset, String indexName)
throws AlgebricksException {
- return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName,
mdTxnCtx, appCtx.getClusterStateManager());
+ return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx,
dataset, indexName).getSpiltsProvider()
+ .getFileSplits();
}
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx,
DataverseName dataverseName,
@@ -1788,8 +1789,6 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
public PartitioningProperties getPartitioningProperties(Dataset ds, String
indexName) throws AlgebricksException {
- //TODO(partitioning) pass splits rather than mdTxnCtx?
- // FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx,
ds, indexName);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index ec4c985369..7257958f1d 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -18,9 +18,6 @@
*/
package org.apache.asterix.metadata.utils;
-import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
-import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
-
import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;
@@ -31,78 +28,71 @@ import
org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-public class DataPartitioningProvider implements IDataPartitioningProvider {
+public abstract class DataPartitioningProvider implements
IDataPartitioningProvider {
- private final ICcApplicationContext appCtx;
- private final PartitioningScheme scheme;
+ protected final ICcApplicationContext appCtx;
+ protected final ClusterStateManager clusterStateManager;
- public DataPartitioningProvider(ICcApplicationContext appCtx) {
+ DataPartitioningProvider(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
- scheme = appCtx.getStorageProperties().getPartitioningScheme();
+ this.clusterStateManager = (ClusterStateManager)
appCtx.getClusterStateManager();
}
- public PartitioningProperties getPartitioningProperties(DataverseName
dataverseName) {
- if (scheme == DYNAMIC) {
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraints = SplitsAndConstraintsUtil
-
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
dataverseName);
- int[][] partitionsMap =
getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
- return PartitioningProperties.of(splitsAndConstraints.first,
splitsAndConstraints.second, partitionsMap);
- } else if (scheme == STATIC) {
- throw new NotImplementedException();
+ public static DataPartitioningProvider create(ICcApplicationContext
appCtx) {
+ PartitioningScheme partitioningScheme =
appCtx.getStorageProperties().getPartitioningScheme();
+ switch (partitioningScheme) {
+ case DYNAMIC:
+ return new DynamicDataPartitioningProvider(appCtx);
+ case STATIC:
+ return new StaticDataPartitioningProvider(appCtx);
+ default:
+ throw new IllegalStateException("unknown partitioning scheme:
" + partitioningScheme);
}
- throw new IllegalStateException();
}
- public PartitioningProperties
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
- String indexName) throws AlgebricksException {
- if (scheme == DYNAMIC) {
- FileSplit[] splits =
- SplitsAndConstraintsUtil.getIndexSplits(ds, indexName,
mdTxnCtx, appCtx.getClusterStateManager());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraints =
-
StoragePathUtil.splitProviderAndPartitionConstraints(splits);
- int[][] partitionsMap =
getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
- return PartitioningProperties.of(splitsAndConstraints.first,
splitsAndConstraints.second, partitionsMap);
- } else if (scheme == STATIC) {
- throw new NotImplementedException();
- }
- throw new IllegalStateException();
- }
+ public abstract PartitioningProperties
getPartitioningProperties(DataverseName dataverseName);
+
+ public abstract PartitioningProperties
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+ String indexName) throws AlgebricksException;
public PartitioningProperties getPartitioningProperties(Feed feed) throws
AsterixException {
- if (scheme == DYNAMIC) {
- IClusterStateManager csm = appCtx.getClusterStateManager();
- AlgebricksAbsolutePartitionConstraint allCluster =
csm.getClusterLocations();
- Set<String> nodes = new
TreeSet<>(Arrays.asList(allCluster.getLocations()));
- AlgebricksAbsolutePartitionConstraint locations =
- new
AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
- FileSplit[] feedLogFileSplits =
- FeedUtils.splitsForAdapter(appCtx,
feed.getDataverseName(), feed.getFeedName(), locations);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
-
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
- int[][] partitionsMap =
getPartitionsMap(getNumPartitions(spC.second));
- return PartitioningProperties.of(spC.first, spC.second,
partitionsMap);
- } else if (scheme == STATIC) {
- throw new NotImplementedException();
- }
- throw new IllegalStateException();
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ AlgebricksAbsolutePartitionConstraint allCluster =
csm.getClusterLocations();
+ Set<String> nodes = new
TreeSet<>(Arrays.asList(allCluster.getLocations()));
+ AlgebricksAbsolutePartitionConstraint locations =
+ new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new
String[0]));
+ FileSplit[] feedLogFileSplits =
+ FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(),
feed.getFeedName(), locations);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
+
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+ int[][] partitionsMap =
getOneToOnePartitionsMap(getLocationsCount(spC.second));
+ return PartitioningProperties.of(spC.first, spC.second, partitionsMap);
+ }
+
+ protected static int getNumberOfPartitions(Dataset ds) {
+ return
MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId())
+ ? MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS
+ : StorageConstants.NUM_STORAGE_PARTITIONS;
}
- private static int getNumPartitions(AlgebricksPartitionConstraint
constraint) {
+ protected static int getLocationsCount(AlgebricksPartitionConstraint
constraint) {
if (constraint.getPartitionConstraintType() ==
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
return ((AlgebricksCountPartitionConstraint)
constraint).getCount();
} else {
@@ -110,7 +100,7 @@ public class DataPartitioningProvider implements
IDataPartitioningProvider {
}
}
- private static int[][] getPartitionsMap(int numPartitions) {
+ protected static int[][] getOneToOnePartitionsMap(int numPartitions) {
int[][] map = new int[numPartitions][1];
for (int i = 0; i < numPartitions; i++) {
map[i] = new int[] { i };
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
new file mode 100644
index 0000000000..95dae4a267
--- /dev/null
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DynamicDataPartitioningProvider extends DataPartitioningProvider {
+
+ public DynamicDataPartitioningProvider(ICcApplicationContext appCtx) {
+ super(appCtx);
+ }
+
+ @Override
+ public PartitioningProperties getPartitioningProperties(DataverseName
dataverseName) {
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraints = SplitsAndConstraintsUtil
+
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
dataverseName);
+ int[][] partitionsMap =
getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
+ return PartitioningProperties.of(splitsAndConstraints.first,
splitsAndConstraints.second, partitionsMap);
+ }
+
+ @Override
+ public PartitioningProperties
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+ String indexName) throws AlgebricksException {
+ FileSplit[] splits =
+ SplitsAndConstraintsUtil.getIndexSplits(ds, indexName,
mdTxnCtx, appCtx.getClusterStateManager());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraints =
+ StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ int[][] partitionsMap =
getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
+ return PartitioningProperties.of(splitsAndConstraints.first,
splitsAndConstraints.second, partitionsMap);
+ }
+}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
new file mode 100644
index 0000000000..eaafc6ce57
--- /dev/null
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.ComputePartition;
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.cluster.SplitComputeLocations;
+import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.MappedFileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class StaticDataPartitioningProvider extends DataPartitioningProvider {
+
+ public StaticDataPartitioningProvider(ICcApplicationContext appCtx) {
+ super(appCtx);
+ }
+
+ @Override
+ public PartitioningProperties getPartitioningProperties(DataverseName
dataverseName) {
+ SplitComputeLocations dataverseSplits =
getDataverseSplits(dataverseName);
+ StorageComputePartitionsMap partitionMap =
clusterStateManager.getStorageComputeMap();
+ int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
+ return PartitioningProperties.of(dataverseSplits.getSplitsProvider(),
dataverseSplits.getConstraints(),
+ partitionsMap);
+ }
+
+ @Override
+ public PartitioningProperties
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+ String indexName) throws AlgebricksException {
+ SplitComputeLocations dataverseSplits = getDatasetSplits(ds,
indexName);
+ StorageComputePartitionsMap partitionMap =
clusterStateManager.getStorageComputeMap();
+ int[][] partitionsMap = partitionMap
+
.getComputeToStorageMap(MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId()));
+ return PartitioningProperties.of(dataverseSplits.getSplitsProvider(),
dataverseSplits.getConstraints(),
+ partitionsMap);
+ }
+
+ private SplitComputeLocations getDataverseSplits(DataverseName
dataverseName) {
+ List<FileSplit> splits = new ArrayList<>();
+ List<String> locations = new ArrayList<>();
+ Set<Integer> uniqueLocations = new HashSet<>();
+ StorageComputePartitionsMap partitionMap =
clusterStateManager.getStorageComputeMap();
+ for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
+ File f = new File(StoragePathUtil.prepareStoragePartitionPath(i),
+ StoragePathUtil.prepareDataverseName(dataverseName));
+ ComputePartition computePartition =
partitionMap.getComputePartition(i);
+ splits.add(new MappedFileSplit(computePartition.getNodeId(),
f.getPath(), 0));
+ if (!uniqueLocations.contains(computePartition.getId())) {
+ locations.add(computePartition.getNodeId());
+ }
+ uniqueLocations.add(computePartition.getId());
+ }
+ IFileSplitProvider splitProvider =
StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0]));
+ AlgebricksPartitionConstraint constraints =
+ new
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+ return new SplitComputeLocations(splitProvider, constraints);
+ }
+
+ private SplitComputeLocations getDatasetSplits(Dataset dataset, String
indexName) {
+ List<FileSplit> splits = new ArrayList<>();
+ List<String> locations = new ArrayList<>();
+ Set<Integer> uniqueLocations = new HashSet<>();
+ StorageComputePartitionsMap partitionMap =
clusterStateManager.getStorageComputeMap();
+ final int datasetPartitons = getNumberOfPartitions(dataset);
+ boolean metadataDataset =
MetadataIndexImmutableProperties.isMetadataDataset(dataset.getDatasetId());
+ for (int i = 0; i < datasetPartitons; i++) {
+ int storagePartition = metadataDataset ?
StorageConstants.METADATA_PARTITION : i;
+ final String relPath =
StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName,
dataset.getRebalanceCount());
+ File f = new
File(StoragePathUtil.prepareStoragePartitionPath(storagePartition), relPath);
+ ComputePartition computePartition =
partitionMap.getComputePartition(storagePartition);
+ splits.add(new MappedFileSplit(computePartition.getNodeId(),
f.getPath(), 0));
+ if (!uniqueLocations.contains(computePartition.getId())) {
+ locations.add(computePartition.getNodeId());
+ }
+ uniqueLocations.add(computePartition.getId());
+ }
+ IFileSplitProvider splitProvider =
StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0]));
+ AlgebricksPartitionConstraint constraints =
+ new
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+ return new SplitComputeLocations(splitProvider, constraints);
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 7e51ec16db..3762e82991 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -65,6 +65,8 @@ import org.apache.hyracks.storage.common.MultiComparator;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
@@ -82,11 +84,10 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
private final IFrameOperationCallback[] frameOpCallbacks;
private boolean flushedPartialTuples;
- private int currentTupleIdx;
- private int lastFlushedTupleIdx;
-
private final PermutingFrameTupleReference keyTuple;
private final Int2ObjectMap<IntSet> partition2TuplesMap = new
Int2ObjectOpenHashMap<>();
+ private final IntSet processedTuples = new IntOpenHashSet();
+ private final IntSet flushedTuples = new IntOpenHashSet();
private final SourceLocation sourceLoc;
public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int
partition,
@@ -142,7 +143,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
@Override
public void process(FrameTupleAccessor accessor,
ITupleReference tuple, int index)
throws HyracksDataException {
- if (index < currentTupleIdx) {
+ if (processedTuples.contains(index)) {
// already processed; skip
return;
}
@@ -174,7 +175,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
throw
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
}
- currentTupleIdx = index + 1;
+ processedTuples.add(index);
}
@Override
@@ -197,15 +198,14 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
@Override
public void open() throws HyracksDataException {
- currentTupleIdx = 0;
- lastFlushedTupleIdx = 0;
flushedPartialTuples = false;
accessor = new FrameTupleAccessor(inputRecDesc);
writeBuffer = new VSizeFrame(ctx);
try {
INcApplicationContext appCtx =
(INcApplicationContext)
ctx.getJobletContext().getServiceContext().getApplicationContext();
-
+ writer.open();
+ writerOpen = true;
for (int i = 0; i < partitions.length; i++) {
IIndexDataflowHelper indexHelper = indexHelpers[i];
indexHelper.open();
@@ -224,8 +224,6 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
new
PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK,
callback, ctx);
}
- writer.open();
- writerOpen = true;
modCallbacks[i] =
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
ctx, this);
searchCallbacks[i] = (LockThenSearchOperationCallback)
searchCallbackFactory
@@ -283,9 +281,9 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
}
- currentTupleIdx = 0;
- lastFlushedTupleIdx = 0;
flushedPartialTuples = false;
+ processedTuples.clear();
+ flushedTuples.clear();
}
/**
@@ -293,15 +291,17 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
*/
@Override
public void flushPartialFrame() throws HyracksDataException {
- if (lastFlushedTupleIdx == currentTupleIdx) {
- //nothing to flush
- return;
- }
- for (int i = lastFlushedTupleIdx; i < currentTupleIdx; i++) {
- FrameUtils.appendToWriter(writer, appender, accessor, i);
+ IntList tuplesToFlush = new IntArrayList();
+ processedTuples.iterator().forEachRemaining(tIdx -> {
+ if (!flushedTuples.contains(tIdx)) {
+ tuplesToFlush.add(tIdx);
+ flushedTuples.add(tIdx);
+ }
+ });
+ for (int i = 0; i < tuplesToFlush.size(); i++) {
+ FrameUtils.appendToWriter(writer, appender, accessor,
tuplesToFlush.getInt(i));
}
appender.write(writer, true);
- lastFlushedTupleIdx = currentTupleIdx;
flushedPartialTuples = true;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index d3c87ff341..984b3ce22e 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -34,12 +34,15 @@ import java.util.function.Predicate;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StorageConstants;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.config.IOption;
@@ -79,6 +82,7 @@ public class ClusterStateManager implements
IClusterStateManager {
private ICcApplicationContext appCtx;
private ClusterPartition metadataPartition;
private boolean rebalanceRequired;
+ private StorageComputePartitionsMap storageComputePartitionsMap;
@Override
public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -86,7 +90,14 @@ public class ClusterStateManager implements
IClusterStateManager {
node2PartitionsMap =
appCtx.getMetadataProperties().getNodePartitions();
clusterPartitions =
appCtx.getMetadataProperties().getClusterPartitions();
currentMetadataNode =
appCtx.getMetadataProperties().getMetadataNodeName();
- metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
+ PartitioningScheme partitioningScheme =
appCtx.getStorageProperties().getPartitioningScheme();
+ if (partitioningScheme == PartitioningScheme.DYNAMIC) {
+ metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
+ } else {
+ final ClusterPartition fixedMetadataPartition = new
ClusterPartition(StorageConstants.METADATA_PARTITION,
+ appCtx.getMetadataProperties().getMetadataNodeName(), 0);
+ metadataPartition = fixedMetadataPartition;
+ }
lifecycleCoordinator = appCtx.getNcLifecycleCoordinator();
lifecycleCoordinator.bindTo(this);
}
@@ -299,6 +310,9 @@ public class ClusterStateManager implements
IClusterStateManager {
clusterActiveLocations.removeAll(pendingRemoval);
clusterPartitionConstraint =
new
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new
String[] {}));
+ if (appCtx.getStorageProperties().getPartitioningScheme() ==
PartitioningScheme.STATIC) {
+ storageComputePartitionsMap =
StorageComputePartitionsMap.computePartitionsMap(this);
+ }
}
@Override
@@ -489,6 +503,10 @@ public class ClusterStateManager implements
IClusterStateManager {
return nodeIds.stream().anyMatch(failedNodes::contains);
}
+ public synchronized StorageComputePartitionsMap getStorageComputeMap() {
+ return storageComputePartitionsMap;
+ }
+
private void updateClusterCounters(String nodeId, NcLocalCounters
localCounters) {
final IResourceIdManager resourceIdManager =
appCtx.getResourceIdManager();
resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index bb3cde5944..1e813462b3 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -211,7 +211,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager,
relativePath);
boolean resourceExists = resourceFile.getFile().exists();
- if (resourceExists) {
+ if (isReplicationEnabled && resourceExists) {
try {
createReplicationJob(ReplicationOperation.DELETE,
resourceFile);
} catch (Exception e) {