This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d7ae74c9c14e961de9eb950006057a07150e3dc5 Author: Ritik Raj <[email protected]> AuthorDate: Wed Jul 16 23:57:36 2025 +0530 [ASTERIXDB-3601][FUNC] Added column-count function - user model changes: no - storage format changes: no - interface changes: no Details: Added function to give count of columns per storage partition for a collection with column storage format. Ext-ref: MB-66306 Change-Id: Id7ce0662811ce72c882de08fbd2c0964f88d3dd4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20108 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Ritik Raj <[email protected]> --- .../CollectionEstimateColumnCountDatasource.java | 83 +++++++++++++++++++ .../CollectionEstimateColumnCountFunction.java | 82 +++++++++++++++++++ .../CollectionEstimateColumnCountReader.java | 68 ++++++++++++++++ .../CollectionEstimateColumnCountRewriter.java | 94 ++++++++++++++++++++++ .../message/EstimateColumnCountRequestMessage.java | 92 +++++++++++++++++++++ .../EstimateColumnCountResponseMessage.java | 74 +++++++++++++++++ .../GetEstimatedColumnCountRequestMessage.java | 75 +++++++++++++++++ .../GetEstimatedColumnCountResponseMessage.java | 58 +++++++++++++ .../asterix/util/MetadataBuiltinFunctions.java | 7 ++ .../java/org/apache/asterix/utils/StorageUtil.java | 60 ++++++++++++++ .../collection-does-not-exist.000.query.sqlpp | 20 +++++ .../collection-does-not-exist.001.ddl.sqlpp | 20 +++++ .../collection-exists.000.ddl.sqlpp | 30 +++++++ .../collection-exists.001.update.sqlpp | 25 ++++++ .../collection-exists.002.query.sqlpp | 20 +++++ .../collection-exists.003.ddl.sqlpp | 20 +++++ .../row-format-collection.000.ddl.sqlpp | 30 +++++++ .../row-format-collection.001.query.sqlpp | 20 +++++ .../row-format-collection.002.ddl.sqlpp | 20 +++++ .../collection-exists/collection-exists.002.adm | 8 ++ .../runtimets/testsuite_cloud_storage.xml | 19 +++++ .../asterix/common/exceptions/ErrorCode.java | 1 + .../src/main/resources/asx_errormsg/en.properties | 1 + .../apache/asterix/metadata/utils/DatasetUtil.java | 19 +++++ .../lsm/btree/column/impls/lsm/LSMColumnBTree.java | 4 + 25 files changed, 950 insertions(+) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountDatasource.java new file mode 100644 index 0000000000..c08effc94d --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountDatasource.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.function.collectioncolumncount; + +import java.util.Objects; + +import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.metadata.api.IDatasourceFunction; +import org.apache.asterix.metadata.declared.DataSourceId; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; + +public class CollectionEstimateColumnCountDatasource extends FunctionDataSource { + + private static final DataSourceId COLLECTION_ESTIMATE_COLUMN_COUNT_DATASOURCE_ID = + new DataSourceId(CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT.getDatabase(), + FunctionSignature + .getDataverseName(CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT), + CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT.getName()); + private final String database; + private final DataverseName dataverse; + private final String collection; + + CollectionEstimateColumnCountDatasource(INodeDomain domain, String database, DataverseName dataverse, + String collection) throws AlgebricksException { + super(COLLECTION_ESTIMATE_COLUMN_COUNT_DATASOURCE_ID, + CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT, domain); + this.database = database; + this.dataverse = dataverse; + this.collection = collection; + } + + public String getDatabase() { + return database; + } + + public DataverseName getDataverse() { + return dataverse; + } + + public String getCollection() { + return collection; + } + + @Override + protected IDatasourceFunction createFunction(MetadataProvider metadataProvider, + AlgebricksAbsolutePartitionConstraint locations) { + return new CollectionEstimateColumnCountFunction( + AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations()), database, dataverse, + collection); + } + + @Override + protected boolean sameFunctionDatasource(FunctionDataSource other) { + if (!Objects.equals(this.functionId, other.getFunctionId())) { + return false; + } + CollectionEstimateColumnCountDatasource that = (CollectionEstimateColumnCountDatasource) other; + return Objects.equals(this.database, that.getDatabase()) && Objects.equals(this.dataverse, that.getDataverse()) + && Objects.equals(this.collection, that.getCollection()); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountFunction.java new file mode 100644 index 0000000000..8c658b5f59 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountFunction.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.function.collectioncolumncount; + +import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; + +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.app.message.GetEstimatedColumnCountRequestMessage; +import org.apache.asterix.app.message.GetEstimatedColumnCountResponseMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.metadata.declared.AbstractDatasourceFunction; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CollectionEstimateColumnCountFunction extends AbstractDatasourceFunction { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + + private final String database; + private final DataverseName dataverseName; + private final String collection; + + CollectionEstimateColumnCountFunction(AlgebricksAbsolutePartitionConstraint locations, String database, + DataverseName dataverseName, String collection) { + super(locations); + this.database = database; + this.dataverseName = dataverseName; + this.collection = collection; + } + + @Override + public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) + throws HyracksDataException { + INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); + INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker(); + MessageFuture messageFuture = messageBroker.registerMessageFuture(); + long futureId = messageFuture.getFutureId(); + + GetEstimatedColumnCountRequestMessage request = new GetEstimatedColumnCountRequestMessage( + serviceCtx.getNodeId(), futureId, database, dataverseName, collection); + try { + messageBroker.sendMessageToPrimaryCC(request); + GetEstimatedColumnCountResponseMessage response = (GetEstimatedColumnCountResponseMessage) messageFuture + .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + if (response.getFailure() != null) { + throw HyracksDataException.create(response.getFailure()); + } + return new CollectionEstimateColumnCountReader(response.getColumnCount()); + } catch (Exception e) { + LOGGER.info("Could not get estimated column count", e); + throw HyracksDataException.create(e); + } finally { + messageBroker.deregisterMessageFuture(futureId); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountReader.java new file mode 100644 index 0000000000..7844838f6c --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountReader.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.function.collectioncolumncount; + +import java.io.IOException; + +import org.apache.asterix.app.function.FunctionReader; +import org.apache.asterix.external.api.IRawRecord; +import org.apache.asterix.external.input.record.CharArrayRecord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.IntIterator; + +public class CollectionEstimateColumnCountReader extends FunctionReader { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String STORAGE_PARTITION = "id"; + private static final String COLUMN_COUNT = "columnCount"; + private final CharArrayRecord record; + private final Int2IntMap estimatedColumnCount; + private final IntIterator partitionIds; + private final ObjectNode node; + + CollectionEstimateColumnCountReader(Int2IntMap estimatedColumnCount) { + this.estimatedColumnCount = estimatedColumnCount; + this.partitionIds = estimatedColumnCount.keySet().iterator(); + this.node = mapper.createObjectNode(); + this.record = new CharArrayRecord(); + } + + @Override + public boolean hasNext() throws IOException { + return partitionIds.hasNext(); + } + + @Override + public IRawRecord<char[]> next() throws IOException { + record.reset(); + int key = partitionIds.nextInt(); + int columnCount = estimatedColumnCount.get(key); + node.removeAll(); + node.put(STORAGE_PARTITION, key); + node.put(COLUMN_COUNT, columnCount); + record.append(node.toString().toCharArray()); + record.endRecord(); + return record; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountRewriter.java new file mode 100644 index 0000000000..72418e5fa3 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/collectioncolumncount/CollectionEstimateColumnCountRewriter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.function.collectioncolumncount; + +import static org.apache.asterix.common.exceptions.ErrorCode.TYPE_MISMATCH_FUNCTION; + +import java.util.List; + +import org.apache.asterix.app.function.FunctionRewriter; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.functions.FunctionConstants; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.om.constants.AsterixConstantValue; +import org.apache.asterix.om.exceptions.ExceptionUtil; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.utils.ConstantExpressionUtil; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +/** + * This function takes a collection's fully qualified name (database.scope.collection) and + * returns the column count per partition. + */ + +public class CollectionEstimateColumnCountRewriter extends FunctionRewriter { + + public static final FunctionIdentifier ESTIMATED_COLLECTION_COLUMN_COUNT = + FunctionConstants.newAsterix("estimated-collection-column-count", FunctionIdentifier.VARARGS); + public static final CollectionEstimateColumnCountRewriter INSTANCE = + new CollectionEstimateColumnCountRewriter(ESTIMATED_COLLECTION_COLUMN_COUNT); + + private CollectionEstimateColumnCountRewriter(FunctionIdentifier functionId) { + super(functionId); + } + + @Override + protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression function) + throws AlgebricksException { + + if (function.getArguments().size() != 3) { + throw new CompilationException(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS, + ESTIMATED_COLLECTION_COLUMN_COUNT.getName()); + } + + verifyArgs(function.getArguments()); + ILogicalExpression databaseExpr = function.getArguments().get(0).getValue(); + ILogicalExpression scopeExpr = function.getArguments().get(1).getValue(); + ILogicalExpression collectionExpr = function.getArguments().get(2).getValue(); + + String database = ConstantExpressionUtil.getStringConstant(databaseExpr); + DataverseName dataverse = + DataverseName.createSinglePartName(ConstantExpressionUtil.getStringConstant(scopeExpr)); + String collection = ConstantExpressionUtil.getStringConstant(collectionExpr); + + return new CollectionEstimateColumnCountDatasource(context.getComputationNodeDomain(), database, dataverse, + collection); + } + + private void verifyArgs(List<Mutable<ILogicalExpression>> args) throws CompilationException { + for (int i = 0; i < args.size(); i++) { + ConstantExpression expr = (ConstantExpression) args.get(i).getValue(); + AsterixConstantValue value = (AsterixConstantValue) expr.getValue(); + ATypeTag type = value.getObject().getType().getTypeTag(); + if (type != ATypeTag.STRING) { + throw new CompilationException(TYPE_MISMATCH_FUNCTION, ESTIMATED_COLLECTION_COLUMN_COUNT.getName(), + ExceptionUtil.indexToPosition(i), ATypeTag.STRING, type); + } + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EstimateColumnCountRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EstimateColumnCountRequestMessage.java new file mode 100644 index 0000000000..7272f3bfd1 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EstimateColumnCountRequestMessage.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import java.util.List; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.CcIdentifiedMessage; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.asterix.metadata.utils.DatasetPartitions; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; + +public class EstimateColumnCountRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + private final long reqId; + private final List<DatasetPartitions> datasetPartitions; + + public EstimateColumnCountRequestMessage(long reqId, List<DatasetPartitions> datasetPartitions) { + this.reqId = reqId; + this.datasetPartitions = datasetPartitions; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException { + INCServiceContext ctx = appCtx.getServiceContext(); + Int2IntMap columnCountMap = new Int2IntOpenHashMap(); + try { + Thread.currentThread().setName(toString()); + for (DatasetPartitions partitions : datasetPartitions) { + for (int partition : partitions.getPartitions()) { + IIndexDataflowHelper indexDataflowHelper = partitions.getPrimaryIndexDataflowHelperFactory() + .create(ctx, partition); + try (indexDataflowHelper) { + indexDataflowHelper.open(); + LSMColumnBTree index = (LSMColumnBTree) indexDataflowHelper.getIndexInstance(); + columnCountMap.put(partition, index.getNumberOfColumns()); + } + } + } + EstimateColumnCountResponseMessage response = + new EstimateColumnCountResponseMessage(reqId, columnCountMap, null); + respond(appCtx, response); + } catch (Exception e) { + LOGGER.info("failed to get collection column count", e); + EstimateColumnCountResponseMessage response = new EstimateColumnCountResponseMessage(reqId, columnCountMap, e); + respond(appCtx, response); + } + } + + private void respond(INcApplicationContext appCtx, EstimateColumnCountResponseMessage response) + throws HyracksDataException { + NCMessageBroker messageBroker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + try { + messageBroker.sendMessageToPrimaryCC(response); + } catch (Exception e) { + LOGGER.info("failed to send collection column count to cc", e); + throw HyracksDataException.create(e); + } + } + + @Override + public boolean isWhispered() { + return true; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EstimateColumnCountResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EstimateColumnCountResponseMessage.java new file mode 100644 index 0000000000..b6d7796ff5 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EstimateColumnCountResponseMessage.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INcResponse; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; + +public class EstimateColumnCountResponseMessage implements ICcAddressedMessage, INcResponse { + + private static final long serialVersionUID = 1L; + private final long reqId; + private final Int2IntMap estimatedColumnCount; + private final Throwable failure; + + EstimateColumnCountResponseMessage(long reqId, Int2IntMap estimatedColumnCount, Throwable failure) { + this.reqId = reqId; + this.estimatedColumnCount = estimatedColumnCount; + this.failure = failure; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + broker.respond(reqId, this); + } + + @Override + public void setResult(MutablePair<ICCMessageBroker.ResponseState, Object> result) { + if (failure != null) { + result.setLeft(ICCMessageBroker.ResponseState.FAILURE); + result.setRight(failure); + return; + } + setResponse(result); + } + + private void setResponse(MutablePair<ICCMessageBroker.ResponseState, Object> result) { + switch (result.getKey()) { + case SUCCESS: + Int2IntMap currentColumnCount = (Int2IntMap) result.getValue(); + currentColumnCount.putAll(estimatedColumnCount); + break; + case UNINITIALIZED: + result.setLeft(ICCMessageBroker.ResponseState.SUCCESS); + result.setValue(estimatedColumnCount); + break; + default: + break; + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetEstimatedColumnCountRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetEstimatedColumnCountRequestMessage.java new file mode 100644 index 0000000000..d8c78cfd90 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetEstimatedColumnCountRequestMessage.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.asterix.utils.StorageUtil; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; + +public class GetEstimatedColumnCountRequestMessage implements ICcAddressedMessage { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + + private final String nodeId; + private final long reqId; + private final String database; + private final DataverseName dataverse; + private final String collection; + + public GetEstimatedColumnCountRequestMessage(String nodeId, long reqId, String database, DataverseName dataverse, + String collection) { + this.nodeId = nodeId; + this.reqId = reqId; + this.database = database; + this.dataverse = dataverse; + this.collection = collection; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + + try { + Int2IntMap estimatedColumnCount = + StorageUtil.getEstimatedColumnCount(appCtx, database, dataverse, collection); + GetEstimatedColumnCountResponseMessage response = + new GetEstimatedColumnCountResponseMessage(this.reqId, estimatedColumnCount, null); + messageBroker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception ex) { + LOGGER.info("Failed to process column count request", ex); + try { + GetEstimatedColumnCountResponseMessage response = + new GetEstimatedColumnCountResponseMessage(this.reqId, null, ex); + messageBroker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception ex2) { + LOGGER.info("Failed to process column count request", ex2); + throw HyracksDataException.create(ex2); + } + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetEstimatedColumnCountResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetEstimatedColumnCountResponseMessage.java new file mode 100644 index 0000000000..fbbf270a80 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetEstimatedColumnCountResponseMessage.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; + +public class GetEstimatedColumnCountResponseMessage implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private final long reqId; + private final Int2IntMap columnCount; + private final Throwable failure; + + public GetEstimatedColumnCountResponseMessage(long reqId, Int2IntMap columnCount, Throwable failure) { + this.reqId = reqId; + this.columnCount = columnCount; + this.failure = failure; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(reqId); + if (future != null) { + future.complete(this); + } + } + + public Int2IntMap getColumnCount() { + return columnCount; + } + + public Throwable getFailure() { + return failure; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java index ea8d5fcfaf..e86eb8a0ad 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java @@ -31,6 +31,7 @@ import org.apache.asterix.app.function.QueryPartitionRewriter; import org.apache.asterix.app.function.StorageComponentsRewriter; import org.apache.asterix.app.function.TPCDSAllTablesDataGeneratorRewriter; import org.apache.asterix.app.function.TPCDSSingleTableDataGeneratorRewriter; +import org.apache.asterix.app.function.collectioncolumncount.CollectionEstimateColumnCountRewriter; import org.apache.asterix.app.function.collectionsize.StorageSizeRewriter; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.typecomputer.impl.AInt64TypeComputer; @@ -112,6 +113,12 @@ public class MetadataBuiltinFunctions { BuiltinFunctions.addFunction(StorageSizeRewriter.STORAGE_SIZE, AInt64TypeComputer.INSTANCE, true); BuiltinFunctions.addUnnestFun(StorageSizeRewriter.STORAGE_SIZE, true); BuiltinFunctions.addDatasourceFunction(StorageSizeRewriter.STORAGE_SIZE, StorageSizeRewriter.INSTANCE); + // Estimate column count + BuiltinFunctions.addFunction(CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT, + (expression, env, metadataProvider) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); + BuiltinFunctions.addUnnestFun(CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT, true); + BuiltinFunctions.addDatasourceFunction(CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT, + CollectionEstimateColumnCountRewriter.INSTANCE); } private MetadataBuiltinFunctions() { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/StorageUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/StorageUtil.java index 3eb9d08928..98860c6ac8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/StorageUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/StorageUtil.java @@ -24,8 +24,10 @@ import static org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.asterix.app.message.EstimateColumnCountRequestMessage; import org.apache.asterix.app.message.StorageSizeRequestMessage; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.IMetadataLockManager; @@ -43,8 +45,12 @@ import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.utils.DatasetPartitions; +import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.hyracks.api.util.InvokeUtil; +import it.unimi.dsi.fastutil.ints.Int2IntMap; + public class StorageUtil { public static long getCollectionSize(ICcApplicationContext appCtx, String database, DataverseName dataverse, @@ -99,4 +105,58 @@ public class StorageUtil { () -> metadataProvider.getLocks().unlock()); } } + + public static Int2IntMap getEstimatedColumnCount(ICcApplicationContext appCtx, String database, + DataverseName dataverse, String collection) throws Exception { + IClusterManagementWork.ClusterState state = appCtx.getClusterStateManager().getState(); + if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) { + throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state); + } + + if (!appCtx.getNamespaceResolver().isUsingDatabase()) { + database = MetadataConstants.DEFAULT_DATABASE; + } + + IMetadataLockManager lockManager = appCtx.getMetadataLockManager(); + MetadataProvider metadataProvider = MetadataProvider.createWithDefaultNamespace(appCtx); + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + try { + lockManager.acquireDatabaseReadLock(metadataProvider.getLocks(), database); + lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), database, dataverse); + lockManager.acquireDatasetReadLock(metadataProvider.getLocks(), database, dataverse, collection); + Dataset dataset = metadataProvider.findDataset(database, dataverse, collection); + if (dataset == null) { + throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, collection, + MetadataUtil.dataverseName(database, dataverse, metadataProvider.isUsingDatabase())); + } + + if (dataset.getDatasetType() != DatasetConfig.DatasetType.INTERNAL) { + throw new CompilationException(ErrorCode.ESTIMATED_COLUMN_COUNT_NOT_APPLICABLE_TO_TYPE, + "collection type: " + dataset.getDatasetType()); + } + + if (dataset.getDatasetFormatInfo().getFormat() != DatasetConfig.DatasetFormat.COLUMN) { + throw new CompilationException(ErrorCode.ESTIMATED_COLUMN_COUNT_NOT_APPLICABLE_TO_TYPE, + "storage format type: " + dataset.getDatasetFormatInfo().getFormat()); + } + + final List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes()); + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + + Map<String, List<DatasetPartitions>> nodeResources = + DatasetUtil.getNodeResourcesWithoutSecondaries(metadataProvider, dataset); + long reqId = messageBroker.newRequestId(); + List<EstimateColumnCountRequestMessage> requests = new ArrayList<>(); + for (String nc : ncs) { + requests.add(new EstimateColumnCountRequestMessage(reqId, nodeResources.get(nc))); + } + + return (Int2IntMap) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, TimeUnit.SECONDS.toMillis(60), + true); + } finally { + InvokeUtil.tryWithCleanups(() -> MetadataManager.INSTANCE.commitTransaction(mdTxnCtx), + () -> metadataProvider.getLocks().unlock()); + } + } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-does-not-exist/collection-does-not-exist.000.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-does-not-exist/collection-does-not-exist.000.query.sqlpp new file mode 100644 index 0000000000..59fc7b9e3c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-does-not-exist/collection-does-not-exist.000.query.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +SELECT VALUE v FROM `estimated-collection-column-count`("testDatabase", "testScope", "doesNotExistCollection") AS v; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-does-not-exist/collection-does-not-exist.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-does-not-exist/collection-does-not-exist.001.ddl.sqlpp new file mode 100644 index 0000000000..10b92b412d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-does-not-exist/collection-does-not-exist.001.ddl.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATABASE testDatabase IF EXISTS; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.000.ddl.sqlpp new file mode 100644 index 0000000000..28ec780dde --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.000.ddl.sqlpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATABASE testDatabase IF EXISTS; +CREATE DATABASE testDatabase; +CREATE DATAVERSE testDatabase.testScope; +USE testDatabase.testScope; + +CREATE COLLECTION testCollection +PRIMARY KEY (id: int) WITH { + "storage-format": { + "format": "column" + } +}; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.001.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.001.update.sqlpp new file mode 100644 index 0000000000..4383138aaa --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.001.update.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE testDatabase.testScope; + +INSERT INTO testCollection( + SELECT VALUE {"id": intVal, "some_value": intVal} + FROM range(1, 512) intVal +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.002.query.sqlpp new file mode 100644 index 0000000000..a175ae4c66 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.002.query.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +SELECT VALUE v FROM `estimated-collection-column-count`("testDatabase", "testScope", "testCollection") AS v; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.003.ddl.sqlpp new file mode 100644 index 0000000000..51261d2247 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/collection-exists/collection-exists.003.ddl.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATABASE testDatabase IF EXISTS; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.000.ddl.sqlpp new file mode 100644 index 0000000000..f39ea1b23c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.000.ddl.sqlpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATABASE testDatabase IF EXISTS; +CREATE DATABASE testDatabase; +CREATE DATAVERSE testDatabase.testScope; +USE testDatabase.testScope; + +CREATE COLLECTION testCollection +PRIMARY KEY (id: int) WITH { + "storage-format": { + "format": "row" + } +}; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.001.query.sqlpp new file mode 100644 index 0000000000..27741857b4 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.001.query.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +SELECT VALUE v FROM `estimated-collection-column-count`("testDatabase", "testScope", "testCollection") AS v; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.002.ddl.sqlpp new file mode 100644 index 0000000000..10b92b412d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/column-count/row-format-collection/row-format-collection.002.ddl.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATABASE testDatabase IF EXISTS; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/column-count/collection-exists/collection-exists.002.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/column-count/collection-exists/collection-exists.002.adm new file mode 100644 index 0000000000..4d8c55e5e0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/column-count/collection-exists/collection-exists.002.adm @@ -0,0 +1,8 @@ +{ "id": 0, "columnCount": 2 } +{ "id": 1, "columnCount": 2 } +{ "id": 3, "columnCount": 2 } +{ "id": 2, "columnCount": 2 } +{ "id": 7, "columnCount": 2 } +{ "id": 6, "columnCount": 2 } +{ "id": 4, "columnCount": 2 } +{ "id": 5, "columnCount": 2 } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_cloud_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_cloud_storage.xml index 03fa519af5..4a612f77cf 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_cloud_storage.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_cloud_storage.xml @@ -104,5 +104,24 @@ </compilation-unit> </test-case> </test-group> + <test-group name="column-count"> + <test-case FilePath="column/column-count"> + <compilation-unit name="collection-does-not-exist"> + <output-dir compare="Clean-JSON">collection-does-not-exist</output-dir> + <expected-error>Cannot find dataset with name doesNotExistCollection in dataverse testDatabase.testScope</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="column/column-count"> + <compilation-unit name="row-format-collection"> + <output-dir compare="Clean-JSON">row-format-collection</output-dir> + <expected-error>Retrieving estimated column count is not applicable to storage format type: ROW.</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="column/column-count"> + <compilation-unit name="collection-exists"> + <output-dir compare="Clean-JSON">collection-exists</output-dir> + </compilation-unit> + </test-case> + </test-group> &sqlpp_queries; </test-suite> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index f9c4f213a3..1dadeba02e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -330,6 +330,7 @@ public enum ErrorCode implements IError { NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT(1222), INVALID_TRANSFORM_FUNCTION(1223), CANNOT_REPLACE_OBJECT_DEPENDENT_EXISTS(1224), + ESTIMATED_COLUMN_COUNT_NOT_APPLICABLE_TO_TYPE(1225), // Feed errors DATAFLOW_ILLEGAL_STATE(3001), diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index c760e55b2b..9c875d0e20 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -332,6 +332,7 @@ 1222 = No valid authentication parameters were provided to impersonate service account 1223 = Failed to create transform function. Encountered error: '%1$s' 1224 = Cannot replace %1$s %2$s being used by %3$s %4$s +1225 = Retrieving estimated column count is not applicable to %1$s. # Feed Errors 3001 = Illegal state. diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 91187ae171..55f42599ca 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -817,6 +817,13 @@ public class DatasetUtil { secondaries .add(new IndexDataflowHelperFactory(storageManager, idxPartitioningProperties.getSplitsProvider())); } + populateNcResources(dataset, partitioningProperties, nc2Resources, primary, secondaries); + return nc2Resources; + } + + private static void populateNcResources(Dataset dataset, PartitioningProperties partitioningProperties, + Map<String, List<DatasetPartitions>> nc2Resources, IIndexDataflowHelperFactory primary, + List<IIndexDataflowHelperFactory> secondaries) { AlgebricksAbsolutePartitionConstraint computeLocations = (AlgebricksAbsolutePartitionConstraint) partitioningProperties.getConstraints(); int[][] computeStorageMap = partitioningProperties.getComputeStorageMap(); @@ -830,6 +837,18 @@ public class DatasetUtil { dsPartitions.add(storagePartition); } } + } + + public static Map<String, List<DatasetPartitions>> getNodeResourcesWithoutSecondaries( + MetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException { + Map<String, List<DatasetPartitions>> nc2Resources = new HashMap<>(); + IStorageManager storageManager = metadataProvider.getStorageComponentProvider().getStorageManager(); + + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); + IIndexDataflowHelperFactory primary = + new IndexDataflowHelperFactory(storageManager, partitioningProperties.getSplitsProvider()); + + populateNcResources(dataset, partitioningProperties, nc2Resources, primary, Collections.emptyList()); return nc2Resources; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java index 0ba45091ad..6050749443 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java @@ -121,6 +121,10 @@ public class LSMColumnBTree extends LSMBTree { return columnMetadata; } + public int getNumberOfColumns() { + return columnMetadata.getNumberOfColumns(); + } + @Override protected LSMBTreeRangeSearchCursor createCursor(AbstractLSMIndexOperationContext opCtx, boolean returnDeletedTuples, IIndexCursorStats stats) {
