This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new fcba666667 [ASTERIXDB-3177][CONF] Store ingestion logs outside of data storage dir fcba666667 is described below commit fcba66666796b1569073d1d1db05c024bc72b58e Author: Murtadha Hubail <mhub...@apache.org> AuthorDate: Mon May 8 16:40:28 2023 +0300 [ASTERIXDB-3177][CONF] Store ingestion logs outside of data storage dir - user model changes: no - storage format changes: no - interface changes: no Details: - Store ingestion logs on the first iodevice of each node rather than inside the data storage dir. - Move columnar dataset rebalance test to rebalance test suite. Change-Id: Ia5f836fa99f3c982b6420b092d40d88f5f6429c1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17517 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../src/test/resources/runtimets/rebalance.xml | 5 +++ .../test/resources/runtimets/testsuite_sqlpp.xml | 5 --- .../asterix/common/utils/StorageConstants.java | 1 + .../asterix/common/utils/StoragePathUtil.java | 9 ++++ .../adapter/factory/GenericAdapterFactory.java | 5 ++- .../apache/asterix/external/util/FeedUtils.java | 36 ++++------------ .../metadata/utils/DataPartitioningProvider.java | 2 +- .../hyracks/api/io/DefaultIoDeviceFileSplit.java | 48 ++++++++++++++++++++++ 8 files changed, 74 insertions(+), 37 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml index 511e27bdcb..a4158ea511 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml @@ -99,5 +99,10 @@ ERROR_BODY: {"results":"to rebalance a particular dataset, the parameter dataver <output-dir compare="Text">all_datasets</output-dir> </compilation-unit> </test-case> + <test-case FilePath="column"> + <compilation-unit name="rebalance"> + <output-dir compare="Text">rebalance</output-dir> + </compilation-unit> + </test-case> </test-group> </test-suite> diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 08f71f7a17..299e32a63f 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -16274,11 +16274,6 @@ <output-dir compare="Text">secondary-index/create-index/after-upsert-with-meta</output-dir> </compilation-unit> </test-case> - <test-case FilePath="column"> - <compilation-unit name="rebalance"> - <output-dir compare="Text">rebalance</output-dir> - </compilation-unit> - </test-case> </test-group> <test-group name="copy"> <test-case FilePath="copy"> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index c26fe76156..5d6322b0bd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -31,6 +31,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.ConcurrentMergePolicyFacto public class StorageConstants { public static final String STORAGE_ROOT_DIR_NAME = "storage"; + public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs"; public static final String PARTITION_DIR_PREFIX = "partition_"; /** * Any file that shares the same directory as the LSM index files must diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index c87f3685e0..a9ed066009 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -30,6 +30,7 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.DefaultIoDeviceFileSplit; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.MappedFileSplit; @@ -65,11 +66,19 @@ public class StoragePathUtil { return new MappedFileSplit(partition.getActiveNodeId(), relativePath, partition.getIODeviceNum()); } + public static FileSplit getDefaultIoDeviceFileSpiltForNode(String nodeId, String relativePath) { + return new DefaultIoDeviceFileSplit(nodeId, relativePath); + } + public static String prepareStoragePartitionPath(int partitonId) { return Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partitonId) .toString(); } + public static String prepareIngestionLogPath() { + return Paths.get(StorageConstants.INGESTION_LOGS_DIR_NAME).toString(); + } + public static String prepareDataverseIndexName(DataverseName dataverseName, String datasetName, String idxName, long rebalanceCount) { return prepareDataverseComponentName(dataverseName, prepareFullIndexName(datasetName, idxName, rebalanceCount)); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 3ac8a0292e..214f3b391a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -94,7 +94,8 @@ public class GenericAdapterFactory implements ITypedAdapterFactory { } if (isFeed) { if (feedLogManager == null) { - feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits); + feedLogManager = + new FeedLogManager(feedLogFileSplits[partition].getFileReference(ctx.getIoManager()).getFile()); } feedLogManager.touch(); } @@ -146,7 +147,7 @@ public class GenericAdapterFactory implements ITypedAdapterFactory { this.isFeed = ExternalDataUtils.isFeed(configuration); if (isFeed) { //TODO(partitioning) make this code reuse DataPartitioningProvider - feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx, ExternalDataUtils.getDatasetDataverse(configuration), + feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDatasetDataverse(configuration), ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint()); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index 0a91ae8c35..5baefcba47 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; @@ -45,11 +44,8 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.FileSplit; -import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.util.IntSerDeUtils; @@ -84,46 +80,28 @@ public class FeedUtils { private FeedUtils() { } - public static FileSplit splitsForAdapter(DataverseName dataverseName, String feedName, String nodeName, - ClusterPartition partition) { + private static FileSplit splitsForAdapter(DataverseName dataverseName, String feedName, String nodeName) { String relPathFile = StoragePathUtil.prepareDataverseComponentName(dataverseName, feedName); - String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(partition.getPartitionId()); + String storagePartitionPath = StoragePathUtil.prepareIngestionLogPath(); // Note: feed adapter instances in a single node share the feed logger - // format: 'storage dir name'/partition_#/dataverse_part1[^dataverse_part2[...]]/feed/node + // format: 'ingestion logs dir name'/dataverse_part1[^dataverse_part2[...]]/feed/node File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName); - return StoragePathUtil.getFileSplitForClusterPartition(partition, f.getPath()); + return StoragePathUtil.getDefaultIoDeviceFileSpiltForNode(nodeName, f.getPath()); } - public static FileSplit[] splitsForAdapter(ICcApplicationContext appCtx, DataverseName dataverseName, - String feedName, AlgebricksPartitionConstraint partitionConstraints) throws AsterixException { + public static FileSplit[] splitsForAdapter(DataverseName dataverseName, String feedName, + AlgebricksPartitionConstraint partitionConstraints) throws AsterixException { if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) { throw new AsterixException("Can't create file splits for adapter with count partitioning constraints"); } String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); List<FileSplit> splits = new ArrayList<>(); for (String nd : locations) { - splits.add(splitsForAdapter(dataverseName, feedName, nd, - appCtx.getClusterStateManager().getNodePartitions(nd)[0])); + splits.add(splitsForAdapter(dataverseName, feedName, nd)); } return splits.toArray(new FileSplit[] {}); } - public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) { - return ioManager.getFileReference(ioDeviceId, relativePath); - } - - public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition, - FileSplit[] feedLogFileSplits) throws HyracksDataException { - return new FeedLogManager( - FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, ctx.getIoManager()).getFile()); - } - - public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit) - throws HyracksDataException { - return new FeedLogManager( - FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, ctx.getIoManager()).getFile()); - } - public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta) throws HyracksDataException { // read the message and reduce the number of tuples diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java index 7257958f1d..63af664a1b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java @@ -79,7 +79,7 @@ public abstract class DataPartitioningProvider implements IDataPartitioningProvi AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0])); FileSplit[] feedLogFileSplits = - FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations); + FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC = StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(spC.second)); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java new file mode 100644 index 0000000000..9f88f31a24 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.io; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * A FileSplit that is mapped to the default IO device + */ +public class DefaultIoDeviceFileSplit extends MappedFileSplit { + + public static final int DEFAULT_IO_DEVICE_IDX = 0; + private static final long serialVersionUID = 1L; + private transient FileReference cached; + + /** + * Construct a managed File split that is mapped to the default IO device + * @param node + * @param path + */ + public DefaultIoDeviceFileSplit(String node, String path) { + super(node, path, DEFAULT_IO_DEVICE_IDX); + } + + @Override + public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException { + if (cached == null) { + cached = ioManager.getFileReference(DEFAULT_IO_DEVICE_IDX, getPath()); + } + return cached; + } +}