http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index ed864cc..e52449c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -11121,4 +11121,63 @@ </test-case> </test-group> &GeoQueries; + <test-group name="compression"> + <test-case FilePath="compression"> + <compilation-unit name="incompressible-pages/large-page"> + <output-dir compare="Text">incompressible-pages/large-page</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="compression"> + <compilation-unit name="incompressible-pages/small-page"> + <output-dir compare="Text">incompressible-pages/small-page</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="compression"> + <compilation-unit name="invalid-compression-scheme"> + <output-dir compare="Text">invalid-compression-scheme</output-dir> + <expected-error>ASX1096: Unknown compression scheme zip. Supported schemes are [snappy,none]</expected-error> + <source-location>false</source-location> + </compilation-unit> + </test-case> + <test-case FilePath="compression"> + <compilation-unit name="scheme-none"> + <output-dir compare="Text">scheme-none</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="compression"> + <compilation-unit name="scheme-snappy"> + <output-dir compare="Text">scheme-snappy</output-dir> + </compilation-unit> + </test-case> + </test-group> + <test-group name="ddl-with-clause"> + <test-case FilePath="ddl-with-clause"> + <compilation-unit name="missing-non-optional"> + <output-dir compare="Text">missing-non-optional</output-dir> + <expected-error>ASX1061: Field "merge-policy.name" in the with clause cannot be null or missing</expected-error> + <source-location>false</source-location> + </compilation-unit> + </test-case> + <test-case FilePath="ddl-with-clause"> + <compilation-unit name="type-mismatch"> + <output-dir compare="Text">type-mismatch</output-dir> + <expected-error>ASX1060: Field "merge-policy.parameters.max-mergable-component-size" in the with clause must be of type bigint, but found string</expected-error> + <source-location>false</source-location> + </compilation-unit> + </test-case> + <test-case FilePath="ddl-with-clause"> + <compilation-unit name="unsupported-field"> + <output-dir compare="Text">unsupported-field</output-dir> + <expected-error>ASX1059: Field(s) [unknown-field] unsupported in the with clause</expected-error> + <source-location>false</source-location> + </compilation-unit> + </test-case> + <test-case FilePath="ddl-with-clause"> + <compilation-unit name="unsupported-subfield"> + <output-dir compare="Text">unsupported-subfield</output-dir> + <expected-error>ASX1097: Subfield(s) [unknown-subfield] in "merge-policy" unsupported in the with clause</expected-error> + <source-location>false</source-location> + </compilation-unit> + </test-case> + </test-group> </test-suite>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java index 85e44ea..6bd73e7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java @@ -22,6 +22,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER; +import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import static org.apache.hyracks.control.common.config.OptionTypes.UNSIGNED_INTEGER; import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE; @@ -47,7 +48,8 @@ public class StorageProperties extends AbstractProperties { STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS(POSITIVE_INTEGER, 2), STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES(POSITIVE_INTEGER, 8), STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d), - STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8); + STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8), + STORAGE_COMPRESSION_BLOCK(STRING, "none"); private final IOptionType interpreter; private final Object defaultValue; @@ -88,6 +90,8 @@ public class StorageProperties extends AbstractProperties { return "The maximum acceptable false positive rate for bloom filters associated with LSM indexes"; case STORAGE_MAX_ACTIVE_WRITABLE_DATASETS: return "The maximum number of datasets that can be concurrently modified"; + case STORAGE_COMPRESSION_BLOCK: + return "The default compression scheme for the storage"; default: throw new IllegalStateException("NYI: " + this); } @@ -179,6 +183,10 @@ public class StorageProperties extends AbstractProperties { return accessor.getInt(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS); } + public String getCompressionScheme() { + return accessor.getString(Option.STORAGE_COMPRESSION_BLOCK); + } + protected int getMetadataDatasets() { return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 18e3327..81ae3e1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -21,14 +21,15 @@ package org.apache.asterix.common.dataflow; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.api.INodeJobTracker; -import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ExtensionProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.INcLifecycleCoordinator; +import org.apache.asterix.common.storage.ICompressionManager; import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.storage.common.IStorageManager; @@ -127,4 +128,9 @@ public interface ICcApplicationContext extends IApplicationContext { * @return the transaction id factory */ ITxnIdFactory getTxnIdFactory(); + + /** + * @return the compression manager + */ + ICompressionManager getCompressionManager(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 0bf446e..54fd65c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -147,7 +147,7 @@ public class ErrorCode { public static final int UNSUPPORTED_WITH_FIELD = 1059; public static final int WITH_FIELD_MUST_BE_OF_TYPE = 1060; public static final int WITH_FIELD_MUST_CONTAIN_SUB_FIELD = 1061; - public static final int MERGE_POLICY_PARAMETER_INVALID_TYPE = 1062; + public static final int CONFIGURATION_PARAMETER_INVALID_TYPE = 1062; public static final int UNKNOWN_DATAVERSE = 1063; public static final int ERROR_OCCURRED_BETWEEN_TWO_TYPES_CONVERSION = 1064; public static final int CHOSEN_INDEX_COUNT_SHOULD_BE_GREATER_THAN_ONE = 1065; @@ -181,6 +181,8 @@ public class ErrorCode { public static final int COMPILATION_TRANSLATION_ERROR = 1093; public static final int RANGE_MAP_ERROR = 1094; public static final int COMPILATION_EXPECTED_FUNCTION_CALL = 1095; + public static final int UNKNOWN_COMPRESSION_SCHEME = 1096; + public static final int UNSUPPORTED_WITH_SUBFIELD = 1097; // Feed errors public static final int DATAFLOW_ILLEGAL_STATE = 3001; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java new file mode 100644 index 0000000..b9a0baf --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.storage; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; + +/** + * An interface for the compression manager which handles all the registered + * schemes and validates the provided configurations. + */ +public interface ICompressionManager { + + /** + * Get a registered CompressorDecompressorFactory + * + * @param schemeName + * Compression scheme name + * @return Compressor/Decompressor factory if the scheme is specified or NOOP o.w + * @throws CompilationException + */ + ICompressorDecompressorFactory getFactory(String schemeName) throws CompilationException; + + /** + * Get the specified compression scheme in the DDL or the default one + * + * @param ddlScheme + * Compression scheme name from DDL + * @return DDL or default compression scheme name + * @throws CompilationException + */ + String getDdlOrDefaultCompressionScheme(String ddlScheme) throws CompilationException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 8c17ec6..7aa9b91 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -132,10 +132,10 @@ 1056 = Too many options were specified for %1$s 1057 = Expression of type %1$s is not supported in constant record 1058 = Literal of type %1$s is not supported in constant record -1059 = Field \"%1$s\" unsupported in the with clause -1060 = Field \"%1$s\" in the with clause must be of type %2$s -1061 = Field \"%1$s\" in the with clause must contain sub field \"%2$s\" -1062 = Merge policy parameters cannot be of type %1$s +1059 = Field(s) %1$s unsupported in the with clause +1060 = Field \"%1$s\" in the with clause must be of type %2$s, but found %3$s +1061 = Field \"%1$s\" in the with clause cannot be null or missing +1062 = Configuration parameter cannot be of type %1$s 1063 = Cannot find dataverse with name %1$s 1064 = An error was occurred while converting type %1$s to type %2$s. 1065 = There should be at least two applicable indexes. @@ -168,6 +168,8 @@ 1093 = A parser error has occurred. The detail exception: %1$s 1094 = Cannot parse range map: %1$s 1095 = Expected function call +1096 = Unknown compression scheme %1$s. Supported schemes are %2$s +1097 = Subfield(s) %1$s in \"%2$s\" unsupported in the with clause # Feed Errors 3001 = Illegal state. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/pom.xml b/asterixdb/asterix-lang-common/pom.xml index dde41e0..bb153a5 100644 --- a/asterixdb/asterix-lang-common/pom.xml +++ b/asterixdb/asterix-lang-common/pom.xml @@ -107,5 +107,9 @@ <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-data-std</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java index 3d5b815..9f01b1c 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java @@ -18,19 +18,19 @@ */ package org.apache.asterix.lang.common.statement; +import java.util.Map; + import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.lang.common.base.AbstractStatement; import org.apache.asterix.lang.common.expression.RecordConstructor; import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.util.ConfigurationUtil; import org.apache.asterix.lang.common.util.ExpressionUtils; -import org.apache.asterix.lang.common.util.MergePolicyUtils; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; import org.apache.asterix.object.base.AdmObjectNode; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; -import java.util.Map; - /** * The new create feed statement only concerns the feed adaptor configuration. * All feeds are considered as primary feeds. @@ -76,7 +76,7 @@ public class CreateFeedStatement extends AbstractStatement { } public Map<String, String> getConfiguration() throws CompilationException { - return MergePolicyUtils.toProperties(withObjectNode); + return ConfigurationUtil.toProperties(withObjectNode); } public AdmObjectNode getWithObjectNode() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java index 4aeb6d3..0a17b24 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java @@ -18,31 +18,22 @@ */ package org.apache.asterix.lang.common.statement; -import java.util.Arrays; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.lang.common.base.AbstractStatement; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.expression.RecordConstructor; import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.lang.common.util.ExpressionUtils; -import org.apache.asterix.lang.common.util.MergePolicyUtils; +import org.apache.asterix.lang.common.util.ConfigurationUtil; +import org.apache.asterix.lang.common.util.DatasetDeclParametersUtil; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; import org.apache.asterix.object.base.AdmObjectNode; -import org.apache.asterix.object.base.AdmStringNode; import org.apache.asterix.object.base.IAdmNode; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.asterix.runtime.compression.CompressionManager; public class DatasetDecl extends AbstractStatement { - protected static final String[] WITH_OBJECT_FIELDS = new String[] { MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME }; - protected static final Set<String> WITH_OBJECT_FIELDS_SET = new HashSet<>(Arrays.asList(WITH_OBJECT_FIELDS)); - protected final Identifier name; protected final Identifier dataverse; protected final Identifier itemTypeDataverse; @@ -76,14 +67,7 @@ public class DatasetDecl extends AbstractStatement { } this.nodegroupName = nodeGroupName; this.hints = hints; - try { - this.withObjectNode = withRecord == null ? null : ExpressionUtils.toNode(withRecord); - } catch (CompilationException e) { - throw e; - } catch (AlgebricksException e) { - // TODO(tillw) make signatures throw Algebricks exceptions - throw new CompilationException(e); - } + this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord); this.ifNotExists = ifNotExists; this.datasetType = datasetType; this.datasetDetailsDecl = idd; @@ -141,50 +125,17 @@ public class DatasetDecl extends AbstractStatement { return nodegroupName; } - public String getCompactionPolicy() throws CompilationException { - AdmObjectNode mergePolicy = getMergePolicyObject(); - if (mergePolicy == null) { - return null; - } - IAdmNode mergePolicyName = mergePolicy.get(MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME); - if (mergePolicyName == null) { - throw new CompilationException(ErrorCode.WITH_FIELD_MUST_CONTAIN_SUB_FIELD, - MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME, MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME); - } - if (mergePolicyName.getType() != ATypeTag.STRING) { - throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE, - MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME + '.' - + MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME, - ATypeTag.STRING); - } - return ((AdmStringNode) mergePolicyName).get(); + private AdmObjectNode getMergePolicyObject() { + return (AdmObjectNode) withObjectNode.get(DatasetDeclParametersUtil.MERGE_POLICY_PARAMETER_NAME); } - private static AdmObjectNode validateWithObject(AdmObjectNode withObject) throws CompilationException { - if (withObject == null) { - return null; - } - for (String name : withObject.getFieldNames()) { - if (!WITH_OBJECT_FIELDS_SET.contains(name)) { - throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_FIELD, name); - } - } - return withObject; - } - - private AdmObjectNode getMergePolicyObject() throws CompilationException { - if (withObjectNode == null) { - return null; - } - IAdmNode mergePolicy = validateWithObject(withObjectNode).get(MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME); + public String getCompactionPolicy() { + AdmObjectNode mergePolicy = getMergePolicyObject(); if (mergePolicy == null) { return null; } - if (!mergePolicy.isObject()) { - throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE, - MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME, ATypeTag.OBJECT); - } - return (AdmObjectNode) mergePolicy; + + return mergePolicy.getOptionalString(DatasetDeclParametersUtil.MERGE_POLICY_NAME_PARAMETER_NAME); } public Map<String, String> getCompactionPolicyProperties() throws CompilationException { @@ -192,17 +143,26 @@ public class DatasetDecl extends AbstractStatement { if (mergePolicy == null) { return null; } - IAdmNode mergePolicyParameters = mergePolicy.get(MergePolicyUtils.MERGE_POLICY_PARAMETERS_PARAMETER_NAME); + IAdmNode mergePolicyParameters = + mergePolicy.get(DatasetDeclParametersUtil.MERGE_POLICY_PARAMETERS_PARAMETER_NAME); if (mergePolicyParameters == null) { return null; } - if (mergePolicyParameters.getType() != ATypeTag.OBJECT) { - throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE, - MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME + '.' - + MergePolicyUtils.MERGE_POLICY_PARAMETERS_PARAMETER_NAME, - ATypeTag.OBJECT); + return ConfigurationUtil.toProperties((AdmObjectNode) mergePolicyParameters); + } + + public String getDatasetCompressionScheme() { + if (datasetType != DatasetType.INTERNAL) { + return CompressionManager.NONE; + } + + final AdmObjectNode storageBlockCompression = + (AdmObjectNode) withObjectNode.get(DatasetDeclParametersUtil.STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME); + if (storageBlockCompression == null) { + return null; } - return MergePolicyUtils.toProperties((AdmObjectNode) mergePolicyParameters); + return storageBlockCompression + .getOptionalString(DatasetDeclParametersUtil.STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME); } public Map<String, String> getHints() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java new file mode 100644 index 0000000..4d0baeb --- /dev/null +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.lang.common.util; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashSet; +import java.util.Set; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.object.base.AdmArrayNode; +import org.apache.asterix.object.base.AdmObjectNode; +import org.apache.asterix.object.base.IAdmNode; +import org.apache.asterix.om.types.AOrderedListType; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.AUnionType; +import org.apache.asterix.om.types.IAType; +import org.apache.commons.lang3.tuple.MutablePair; + +class ConfigurationTypeValidator { + //Error information + private final Set<String> unknownFieldNames; + private final Deque<String> path; + private final MutablePair<ATypeTag, ATypeTag> expectedActualTypePair; + private ErrorType result; + + public enum ErrorType { + UNKNOWN_FIELD_NAMES, + TYPE_MISMATCH, + MISSING_UNOPTIONAL_FIELD + } + + protected ConfigurationTypeValidator() { + unknownFieldNames = new HashSet<>(); + path = new ArrayDeque<>(); + expectedActualTypePair = new MutablePair<>(null, null); + } + + public void validateType(IAType type, IAdmNode node) throws CompilationException { + if (!validate(type, node)) { + throwException(); + } + } + + private boolean validate(IAType type, IAdmNode node) { + if (type.getTypeTag().isDerivedType()) { + return validateDerivedType(type, node); + } else if (node == null) { + result = ErrorType.MISSING_UNOPTIONAL_FIELD; + return false; + } else if (type.getTypeTag() != node.getType()) { + setExpectedAndActualType(type.getTypeTag(), node.getType()); + return false; + } + + return true; + } + + private boolean validateDerivedType(IAType type, IAdmNode node) { + final ATypeTag typeTag = type.getTypeTag(); + switch (typeTag) { + case UNION: + return validateUnionType(type, node); + case OBJECT: + return validateObject(type, node); + case ARRAY: + return validateArray(type, node); + default: + throw new IllegalStateException("Unsupported derived type: " + typeTag); + } + } + + private boolean validateUnionType(IAType type, IAdmNode node) { + if (node == null || node.getType() == ATypeTag.NULL) { + return true; + } + return validate(((AUnionType) type).getActualType(), node); + } + + private boolean validateObject(IAType type, IAdmNode node) { + if (node.getType() != ATypeTag.OBJECT) { + setExpectedAndActualType(ATypeTag.OBJECT, node.getType()); + return false; + } + + final ARecordType recordType = (ARecordType) type; + final AdmObjectNode objectNode = (AdmObjectNode) node; + + final String[] fieldNames = recordType.getFieldNames(); + + //Check field names + final Set<String> definedFieldNames = new HashSet<>(Arrays.asList(fieldNames)); + final Set<String> objectFieldNames = objectNode.getFieldNames(); + if (!definedFieldNames.containsAll(objectFieldNames)) { + setUnknownFieldNames(definedFieldNames, objectFieldNames); + return false; + } + + final IAType[] fieldTypes = recordType.getFieldTypes(); + + for (int i = 0; i < fieldTypes.length; i++) { + if (!validate(fieldTypes[i], objectNode.get(fieldNames[i]))) { + addToPath(fieldNames[i]); + return false; + } + } + return true; + } + + private boolean validateArray(IAType type, IAdmNode node) { + if (node.getType() != ATypeTag.ARRAY) { + setExpectedAndActualType(ATypeTag.ARRAY, node.getType()); + return false; + } + + final IAType itemType = ((AOrderedListType) type).getItemType(); + final AdmArrayNode array = (AdmArrayNode) node; + for (int i = 0; i < array.size(); i++) { + if (!validate(itemType, array.get(i))) { + addToPath(i); + return false; + } + } + return true; + } + + private void setUnknownFieldNames(Set<String> definedFieldNames, Set<String> objectFieldNames) { + unknownFieldNames.addAll(objectFieldNames); + unknownFieldNames.removeAll(definedFieldNames); + result = ErrorType.UNKNOWN_FIELD_NAMES; + } + + private void setExpectedAndActualType(ATypeTag expectedTypeTag, ATypeTag actualTypeTag) { + expectedActualTypePair.left = expectedTypeTag; + expectedActualTypePair.right = actualTypeTag; + result = ErrorType.TYPE_MISMATCH; + } + + private void addToPath(String fieldName) { + if (path.isEmpty()) { + path.push(fieldName); + } else { + path.push(fieldName + "."); + } + } + + private void addToPath(int arrayIndex) { + path.push("[" + arrayIndex + "]"); + } + + private void throwException() throws CompilationException { + final StringBuilder pathBuilder = new StringBuilder(); + while (!path.isEmpty()) { + pathBuilder.append(path.pop()); + } + switch (result) { + case UNKNOWN_FIELD_NAMES: + if (pathBuilder.length() > 0) { + throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_SUBFIELD, unknownFieldNames.toString(), + pathBuilder.toString()); + } + throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_FIELD, unknownFieldNames.toString()); + case TYPE_MISMATCH: + throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE, pathBuilder.toString(), + expectedActualTypePair.left, expectedActualTypePair.right); + case MISSING_UNOPTIONAL_FIELD: + throw new CompilationException(ErrorCode.WITH_FIELD_MUST_CONTAIN_SUB_FIELD, pathBuilder.toString()); + + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java new file mode 100644 index 0000000..4bb799b --- /dev/null +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.lang.common.util; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.object.base.AdmObjectNode; +import org.apache.asterix.object.base.AdmStringNode; +import org.apache.asterix.object.base.IAdmNode; + +public class ConfigurationUtil { + + private ConfigurationUtil() { + } + + /** + * Convert the parameters object to a Map<String,String> + * This method should go away once we store the with object as it is in storage + * + * @param parameters + * the parameters passed for the merge policy in the with clause + * @return the parameters as a map + */ + public static Map<String, String> toProperties(AdmObjectNode parameters) throws CompilationException { + Map<String, String> map = new HashMap<>(); + for (Entry<String, IAdmNode> field : parameters.getFields()) { + IAdmNode value = field.getValue(); + map.put(field.getKey(), getStringValue(value)); + } + return map; + } + + /** + * Get string value of {@link IAdmNode} + * + * @param value + * IAdmNode value should be of type integer or string + * @return + * string value of <code>value</code> + * @throws CompilationException + */ + public static String getStringValue(IAdmNode value) throws CompilationException { + switch (value.getType()) { + case BOOLEAN: + case DOUBLE: + case BIGINT: + return value.toString(); + case STRING: + return ((AdmStringNode) value).get(); + default: + throw new CompilationException(ErrorCode.CONFIGURATION_PARAMETER_INVALID_TYPE, value.getType()); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java new file mode 100644 index 0000000..effe4b8 --- /dev/null +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.lang.common.util; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.lang.common.expression.RecordConstructor; +import org.apache.asterix.object.base.AdmObjectNode; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.AUnionType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; + +public class DatasetDeclParametersUtil { + /* *********************************************** + * Merge Policy Parameters + * *********************************************** + */ + public static final String MERGE_POLICY_PARAMETER_NAME = "merge-policy"; + public static final String MERGE_POLICY_NAME_PARAMETER_NAME = "name"; + public static final String MERGE_POLICY_PARAMETERS_PARAMETER_NAME = "parameters"; + public static final String MERGE_POLICY_MERGABLE_SIZE_PARAMETER_NAME = "max-mergable-component-size"; + public static final String MERGE_POLICY_TOLERANCE_COUNT_PARAMETER_NAME = "max-tolerance-component-count"; + public static final String MERGE_POLICY_NUMBER_COMPONENTS_PARAMETER_NAME = "num-components"; + + /* *********************************************** + * Storage Block Compression Parameters + * *********************************************** + */ + public static final String STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME = "storage-block-compression"; + public static final String STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME = "scheme"; + + /* *********************************************** + * Private members + * *********************************************** + */ + private static final ARecordType WITH_OBJECT_TYPE = getWithObjectType(); + private static final AdmObjectNode EMPTY_WITH_OBJECT = new AdmObjectNode(); + + private DatasetDeclParametersUtil() { + } + + public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord) throws CompilationException { + if (withRecord == null) { + return EMPTY_WITH_OBJECT; + } + final ConfigurationTypeValidator validator = new ConfigurationTypeValidator(); + final AdmObjectNode node = ExpressionUtils.toNode(withRecord); + validator.validateType(WITH_OBJECT_TYPE, node); + return node; + } + + private static ARecordType getWithObjectType() { + final String[] withNames = { MERGE_POLICY_PARAMETER_NAME, STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME }; + final IAType[] withTypes = { AUnionType.createUnknownableType(getMergePolicyType()), + AUnionType.createUnknownableType(getStorageBlockCompressionType()) }; + return new ARecordType("withObject", withNames, withTypes, false); + } + + private static ARecordType getMergePolicyType() { + //merge-policy.parameters + final String[] parameterNames = { MERGE_POLICY_MERGABLE_SIZE_PARAMETER_NAME, + MERGE_POLICY_TOLERANCE_COUNT_PARAMETER_NAME, MERGE_POLICY_NUMBER_COMPONENTS_PARAMETER_NAME }; + final IAType[] parametersTypes = { AUnionType.createUnknownableType(BuiltinType.AINT64), + AUnionType.createUnknownableType(BuiltinType.AINT64), + AUnionType.createUnknownableType(BuiltinType.AINT64) }; + final ARecordType parameters = + new ARecordType(MERGE_POLICY_PARAMETERS_PARAMETER_NAME, parameterNames, parametersTypes, false); + + //merge-policy + final String[] mergePolicyNames = { MERGE_POLICY_NAME_PARAMETER_NAME, MERGE_POLICY_PARAMETERS_PARAMETER_NAME }; + final IAType[] mergePolicyTypes = { BuiltinType.ASTRING, AUnionType.createUnknownableType(parameters) }; + + return new ARecordType(MERGE_POLICY_PARAMETER_NAME, mergePolicyNames, mergePolicyTypes, false); + } + + private static ARecordType getStorageBlockCompressionType() { + final String[] schemeName = { STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME }; + final IAType[] schemeType = { BuiltinType.ASTRING }; + return new ARecordType(STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME, schemeName, schemeType, false); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java index 25f9d07..6adb050 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java @@ -39,13 +39,12 @@ import org.apache.asterix.object.base.AdmNullNode; import org.apache.asterix.object.base.AdmObjectNode; import org.apache.asterix.object.base.AdmStringNode; import org.apache.asterix.object.base.IAdmNode; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; public class ExpressionUtils { private ExpressionUtils() { } - public static IAdmNode toNode(Expression expr) throws AlgebricksException { + public static IAdmNode toNode(Expression expr) throws CompilationException { switch (expr.getKind()) { case LIST_CONSTRUCTOR_EXPRESSION: return toNode((ListConstructor) expr); @@ -58,7 +57,7 @@ public class ExpressionUtils { } } - public static AdmObjectNode toNode(RecordConstructor recordConstructor) throws AlgebricksException { + public static AdmObjectNode toNode(RecordConstructor recordConstructor) throws CompilationException { AdmObjectNode node = new AdmObjectNode(); final List<FieldBinding> fbList = recordConstructor.getFbList(); for (int i = 0; i < fbList.size(); i++) { @@ -70,7 +69,7 @@ public class ExpressionUtils { return node; } - private static IAdmNode toNode(ListConstructor listConstructor) throws AlgebricksException { + private static IAdmNode toNode(ListConstructor listConstructor) throws CompilationException { final List<Expression> exprList = listConstructor.getExprList(); AdmArrayNode array = new AdmArrayNode(exprList.size()); for (int i = 0; i < exprList.size(); i++) { @@ -79,7 +78,7 @@ public class ExpressionUtils { return array; } - private static IAdmNode toNode(LiteralExpr literalExpr) throws AlgebricksException { + private static IAdmNode toNode(LiteralExpr literalExpr) throws CompilationException { final Literal value = literalExpr.getValue(); final Literal.Type literalType = value.getLiteralType(); switch (literalType) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java deleted file mode 100644 index 6bb5c36..0000000 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.lang.common.util; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.object.base.AdmObjectNode; -import org.apache.asterix.object.base.AdmStringNode; -import org.apache.asterix.object.base.IAdmNode; - -public class MergePolicyUtils { - public static final String MERGE_POLICY_PARAMETER_NAME = "merge-policy"; - public static final String MERGE_POLICY_NAME_PARAMETER_NAME = "name"; - public static final String MERGE_POLICY_PARAMETERS_PARAMETER_NAME = "parameters"; - - private MergePolicyUtils() { - } - - /** - * Convert the parameters object to a Map<String,String> - * This method should go away once we store the with object as it is in storage - * - * @param parameters - * the parameters passed for the merge policy in the with clause - * @return the parameters as a map - */ - public static Map<String, String> toProperties(AdmObjectNode parameters) throws CompilationException { - Map<String, String> map = new HashMap<>(); - for (Entry<String, IAdmNode> field : parameters.getFields()) { - IAdmNode value = field.getValue(); - switch (value.getType()) { - case BOOLEAN: - case DOUBLE: - case BIGINT: - map.put(field.getKey(), value.toString()); - break; - case STRING: - map.put(field.getKey(), ((AdmStringNode) value).get()); - break; - default: - throw new CompilationException(ErrorCode.MERGE_POLICY_PARAMETER_INVALID_TYPE, value.getType()); - } - } - return map; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 95526f4..2380ab6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -87,6 +87,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -338,7 +339,8 @@ public class MetadataBootstrap { new AsterixVirtualBufferCacheProvider(datasetId), storageComponentProvider.getIoOperationSchedulerProvider(), appContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, - bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, null); + bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, null, + NoOpCompressorDecompressorFactory.INSTANCE); DatasetLocalResourceFactory dsLocalResourceFactory = new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory); // TODO(amoudi) Creating the index should be done through the same code path as http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java index ba1ea03..d9309d9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java @@ -163,6 +163,11 @@ public final class MetadataRecordTypes { public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 11; public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 12; public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 13; + //Optional open fields + public static final String DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME = + "BlockLevelStorageCompression"; + public static final String DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME = "DatasetCompressionScheme"; + public static final String DATASET_ARECORD_REBALANCE_FIELD_NAME = "rebalanceCount"; public static final ARecordType DATASET_RECORDTYPE = createRecordType( // RecordTypeName RECORD_NAME_DATASET, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java index f5cfbb3..62a03ad 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java @@ -31,6 +31,9 @@ import org.apache.hyracks.algebricks.common.utils.Pair; */ public class DatasetHints { + private DatasetHints() { + } + /** * validate the use of a hint * http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java index 301aafb..2a1d551 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java @@ -38,6 +38,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.algebricks.data.ITypeTraitProvider; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; @@ -50,6 +51,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.common.IResourceFactory; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; public class BTreeResourceFactoryProvider implements IResourceFactoryProvider { @@ -93,11 +95,20 @@ public class BTreeResourceFactoryProvider implements IResourceFactoryProvider { case INTERNAL: AsterixVirtualBufferCacheProvider vbcProvider = new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()); + + final ICompressorDecompressorFactory compDecompFactory; + if (index.isPrimaryIndex()) { + //Compress only primary index + compDecompFactory = mdProvider.getCompressionManager().getFactory(dataset.getCompressionScheme()); + } else { + compDecompFactory = NoOpCompressorDecompressorFactory.INSTANCE; + } + return new LSMBTreeLocalResourceFactory(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, true, bloomFilterFields, bloomFilterFalsePositiveRate, - index.isPrimaryIndex(), btreeFields); + index.isPrimaryIndex(), btreeFields, compDecompFactory); default: throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE, dataset.getDatasetType().toString()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index e212d11..85beb95 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -37,6 +37,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.metadata.LockList; +import org.apache.asterix.common.storage.ICompressionManager; import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.StoragePathUtil; @@ -1616,4 +1617,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public ITxnIdFactory getTxnIdFactory() { return appCtx.getTxnIdFactory(); } + + public ICompressionManager getCompressionManager() { + return appCtx.getCompressionManager(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index a25ed20..855cf78 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -25,8 +25,6 @@ import java.util.Map; import java.util.Objects; import java.util.stream.IntStream; -import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.active.IActiveNotificationHandler; import org.apache.asterix.common.api.IDatasetInfoProvider; import org.apache.asterix.common.api.ILSMComponentIdGeneratorFactory; import org.apache.asterix.common.config.DatasetConfig.DatasetType; @@ -65,7 +63,7 @@ import org.apache.asterix.metadata.utils.RTreeResourceFactoryProvider; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.utils.RecordUtil; +import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; @@ -149,6 +147,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { private final String metaTypeName; private final long rebalanceCount; private int pendingOp; + private final String compressionScheme; public Dataset(String dataverseName, String datasetName, String recordTypeDataverseName, String recordTypeName, String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties, @@ -156,29 +155,30 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { int pendingOp) { this(dataverseName, datasetName, recordTypeDataverseName, recordTypeName, /*metaTypeDataverseName*/null, /*metaTypeName*/null, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, - hints, datasetType, datasetId, pendingOp); + hints, datasetType, datasetId, pendingOp, CompressionManager.NONE); } public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName, String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints, - DatasetType datasetType, int datasetId, int pendingOp) { + DatasetType datasetType, int datasetId, int pendingOp, String compressionScheme) { this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName, metaItemTypeName, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, - datasetType, datasetId, pendingOp, 0L); + datasetType, datasetId, pendingOp, 0L, compressionScheme); } public Dataset(Dataset dataset) { this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName, dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName, dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails, - dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount); + dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount, + dataset.compressionScheme); } public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName, String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints, - DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount) { + DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount, String compressionScheme) { this.dataverseName = dataverseName; this.datasetName = datasetName; this.recordTypeName = itemTypeName; @@ -194,6 +194,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { this.pendingOp = pendingOp; this.hints = hints; this.rebalanceCount = rebalanceCount; + this.compressionScheme = compressionScheme; } @Override @@ -357,7 +358,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { new Dataset(dataverseName, datasetName, getItemTypeDataverseName(), getItemTypeName(), getMetaItemTypeDataverseName(), getMetaItemTypeName(), getNodeGroupName(), getCompactionPolicy(), getCompactionPolicyProperties(), getDatasetDetails(), getHints(), - getDatasetType(), getDatasetId(), MetadataUtil.PENDING_DROP_OP)); + getDatasetType(), getDatasetId(), MetadataUtil.PENDING_DROP_OP, getCompressionScheme())); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); bActiveTxn.setValue(false); @@ -644,6 +645,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { tree.put("metaTypeDataverseName", metaTypeDataverseName); tree.put("metaTypeName", metaTypeName); tree.put("pendingOp", MetadataUtil.pendingOpToString(pendingOp)); + tree.put("rebalanceCount", rebalanceCount); + tree.put("compressionScheme", compressionScheme); return tree; } @@ -823,7 +826,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { return new Dataset(this.dataverseName, this.datasetName, this.recordTypeDataverseName, this.recordTypeName, this.metaTypeDataverseName, this.metaTypeName, targetNodeGroupName, this.compactionPolicyFactory, this.compactionPolicyProperties, this.datasetDetails, this.hints, this.datasetType, - DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1); + DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1, + this.compressionScheme); } // Gets an array of partition numbers for this dataset. @@ -840,4 +844,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { public String getFullyQualifiedName() { return dataverseName + '.' + datasetName; } + + public String getCompressionScheme() { + return compressionScheme; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java index d5e179b..27978ab 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java @@ -60,10 +60,13 @@ import org.apache.asterix.om.base.ARecord; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.base.AUnorderedList; import org.apache.asterix.om.base.IACursor; +import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; import org.apache.asterix.om.types.AOrderedListType; +import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.AUnorderedListType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -77,7 +80,6 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { private static final long serialVersionUID = 1L; // Payload field containing serialized Dataset. public static final int DATASET_PAYLOAD_TUPLE_FIELD_INDEX = 2; - private static final String REBALANCE_ID_FIELD_NAME = "rebalanceCount"; @SuppressWarnings("unchecked") protected final ISerializerDeserializer<ARecord> recordSerDes = @@ -256,14 +258,33 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { metaTypeName = ((AString) datasetRecord.getValueByPos(metaTypeNameIndex)).getStringValue(); } - // Read the rebalance count if there is one. - int rebalanceCountIndex = datasetRecord.getType().getFieldIndex(REBALANCE_ID_FIELD_NAME); - long rebalanceCount = rebalanceCountIndex >= 0 - ? ((AInt64) datasetRecord.getValueByPos(rebalanceCountIndex)).getLongValue() : 0; + long rebalanceCount = getRebalanceCount(datasetRecord); + String compressionScheme = getCompressionScheme(datasetRecord); return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType, - datasetId, pendingOp, rebalanceCount); + datasetId, pendingOp, rebalanceCount, compressionScheme); + } + + private long getRebalanceCount(ARecord datasetRecord) { + // Read the rebalance count if there is one. + int rebalanceCountIndex = + datasetRecord.getType().getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_REBALANCE_FIELD_NAME); + return rebalanceCountIndex >= 0 ? ((AInt64) datasetRecord.getValueByPos(rebalanceCountIndex)).getLongValue() + : 0; + } + + private String getCompressionScheme(ARecord datasetRecord) { + final ARecordType datasetType = datasetRecord.getType(); + final int compressionIndex = datasetType + .getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME); + if (compressionIndex >= 0) { + final ARecordType compressionType = (ARecordType) datasetType.getFieldTypes()[compressionIndex]; + final int schemeIndex = compressionType + .getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME); + return ((AString) datasetRecord.getValueByPos(schemeIndex)).getStringValue(); + } + return CompressionManager.NONE; } @Override @@ -392,8 +413,19 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { return tuple; } + /** + * Keep protected to allow other extensions to add additional fields + * + * @param dataset + * @throws HyracksDataException + */ protected void writeOpenFields(Dataset dataset) throws HyracksDataException { - // write open fields + writeMetaPart(dataset); + writeRebalanceCount(dataset); + writeBlockLevelStorageCompression(dataset); + } + + private void writeMetaPart(Dataset dataset) throws HyracksDataException { if (dataset.hasMetaPart()) { // write open field 1, the meta item type Dataverse name. fieldName.reset(); @@ -413,10 +445,35 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { stringSerde.serialize(aString, fieldValue.getDataOutput()); recordBuilder.addField(fieldName, fieldValue); } + } + + private void writeBlockLevelStorageCompression(Dataset dataset) throws HyracksDataException { + if (CompressionManager.NONE.equals(dataset.getCompressionScheme())) { + return; + } + RecordBuilder compressionObject = new RecordBuilder(); + compressionObject.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE); + fieldName.reset(); + aString.setValue(MetadataRecordTypes.DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME); + stringSerde.serialize(aString, fieldName.getDataOutput()); + fieldValue.reset(); + aString.setValue(dataset.getCompressionScheme()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + compressionObject.addField(fieldName, fieldValue); + + fieldName.reset(); + aString.setValue(MetadataRecordTypes.DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME); + stringSerde.serialize(aString, fieldName.getDataOutput()); + fieldValue.reset(); + compressionObject.write(fieldValue.getDataOutput(), true); + recordBuilder.addField(fieldName, fieldValue); + } + + private void writeRebalanceCount(Dataset dataset) throws HyracksDataException { if (dataset.getRebalanceCount() > 0) { // Adds the field rebalanceCount. fieldName.reset(); - aString.setValue("rebalanceCount"); + aString.setValue(MetadataRecordTypes.DATASET_ARECORD_REBALANCE_FIELD_NAME); stringSerde.serialize(aString, fieldName.getDataOutput()); fieldValue.reset(); aBigInt.setValue(dataset.getRebalanceCount()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java index b87ef2a..902ee41 100644 --- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java +++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java @@ -24,12 +24,12 @@ import java.util.HashMap; import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure; import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.junit.Assert; @@ -51,9 +51,9 @@ public class DatasetTupleTranslatorTest { indicator == null ? null : Collections.singletonList(indicator), Collections.singletonList(BuiltinType.AINT64), false, Collections.emptyList()); - Dataset dataset = - new Dataset("test", "log", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES", "prefix", - compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0); + Dataset dataset = new Dataset("test", "log", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES", + "prefix", compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0, + CompressionManager.NONE); DatasetTupleTranslator dtTranslator = new DatasetTupleTranslator(true); ITupleReference tuple = dtTranslator.getTupleFromMetadataEntity(dataset); Dataset deserializedDataset = dtTranslator.getMetadataEntityFromTuple(tuple); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java index ab4229c..7080dee 100644 --- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java +++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java @@ -30,7 +30,6 @@ import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Datatype; @@ -41,6 +40,7 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningS import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.junit.Assert; @@ -62,9 +62,9 @@ public class IndexTupleTranslatorTest { indicator == null ? null : Collections.singletonList(indicator), Collections.singletonList(BuiltinType.AINT64), false, Collections.emptyList()); - Dataset dataset = - new Dataset("test", "d1", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES", "prefix", - compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0); + Dataset dataset = new Dataset("test", "d1", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES", + "prefix", compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0, + CompressionManager.NONE); Index index = new Index("test", "d1", "i1", IndexType.BTREE, Collections.singletonList(Collections.singletonList("row_id")), http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java index 99b7176..6d2658b 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java @@ -153,6 +153,14 @@ public class AdmObjectNode implements IAdmNode { return getString(this, field); } + public String getOptionalString(String field) { + final IAdmNode node = get(field); + if (node == null) { + return null; + } + return ((AdmStringNode) node).get(); + } + public static String getString(AdmObjectNode openFields, String field) throws HyracksDataException { IAdmNode node = openFields.get(field); if (node == null) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java new file mode 100644 index 0000000..3bffa9a --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.compression; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.asterix.common.config.StorageProperties; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.storage.ICompressionManager; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; +import org.apache.hyracks.api.io.IJsonSerializable; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; +import org.apache.hyracks.storage.common.compression.SnappyCompressorDecompressorFactory; + +public class CompressionManager implements ICompressionManager { + private static final Map<String, Class<? extends ICompressorDecompressorFactory>> REGISTERED_SCHEMES = + getRegisteredSchemes(); + public static final String NONE = "none"; + private final String defaultScheme; + + /* + * New compression schemes can be added by registering the name and the factory class + * + * WARNING: Changing scheme name will breakdown storage back compatibility. Before upgrading to a newer + * version of the registered schemes, make sure it is also back-compatible with the previous version. + */ + private static Map<String, Class<? extends ICompressorDecompressorFactory>> getRegisteredSchemes() { + final Map<String, Class<? extends ICompressorDecompressorFactory>> registeredSchemes = new HashMap<>(); + //No compression + registeredSchemes.put(NONE, NoOpCompressorDecompressorFactory.class); + registeredSchemes.put("snappy", SnappyCompressorDecompressorFactory.class); + return registeredSchemes; + } + + public CompressionManager(StorageProperties storageProperties) { + validateCompressionConfiguration(storageProperties); + defaultScheme = storageProperties.getCompressionScheme(); + } + + @Override + public ICompressorDecompressorFactory getFactory(String schemeName) throws CompilationException { + final String scheme = getDdlOrDefaultCompressionScheme(schemeName); + Class<? extends ICompressorDecompressorFactory> clazz = REGISTERED_SCHEMES.get(scheme); + try { + return clazz.newInstance(); + } catch (IllegalAccessException | InstantiationException e) { + throw new IllegalStateException("Failed to instantiate compressor/decompressor: " + scheme, e); + } + } + + @Override + public String getDdlOrDefaultCompressionScheme(String ddlScheme) throws CompilationException { + if (ddlScheme != null && !isRegisteredScheme(ddlScheme)) { + throw new CompilationException(ErrorCode.UNKNOWN_COMPRESSION_SCHEME, ddlScheme, formatSupportedValues()); + } + + return ddlScheme != null ? ddlScheme : defaultScheme; + } + + /** + * Register factory classes for persisted resources + * + * @param registeredClasses + */ + public static void registerCompressorDecompressorsFactoryClasses( + Map<String, Class<? extends IJsonSerializable>> registeredClasses) { + for (Class<? extends ICompressorDecompressorFactory> clazz : REGISTERED_SCHEMES.values()) { + registeredClasses.put(clazz.getSimpleName(), clazz); + } + } + + /** + * @param schemeName + * Compression scheme name + * @return + * true if it is registered + */ + private boolean isRegisteredScheme(String schemeName) { + return schemeName != null && REGISTERED_SCHEMES.containsKey(schemeName.toLowerCase()); + } + + /** + * Validate the configuration of StorageProperties + * + * @param storageProperties + */ + private void validateCompressionConfiguration(StorageProperties storageProperties) { + if (!isRegisteredScheme(storageProperties.getCompressionScheme())) { + final String option = StorageProperties.Option.STORAGE_COMPRESSION_BLOCK.ini(); + final String value = storageProperties.getCompressionScheme(); + throw new IllegalStateException("Invalid compression configuration (" + option + " = " + value + + "). Valid values are: " + formatSupportedValues()); + } + + } + + private String formatSupportedValues() { + final StringBuilder schemes = new StringBuilder(); + final Iterator<String> iterator = REGISTERED_SCHEMES.keySet().iterator(); + schemes.append('['); + schemes.append(iterator.next()); + while (iterator.hasNext()) { + schemes.append(','); + schemes.append(iterator.next()); + } + schemes.append(']'); + return schemes.toString(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index 4157e16..0d2a1df 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -24,7 +24,6 @@ import java.util.function.Supplier; import org.apache.asterix.common.api.ICoordinationService; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.api.INodeJobTracker; -import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ActiveProperties; @@ -44,7 +43,10 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.INcLifecycleCoordinator; +import org.apache.asterix.common.storage.ICompressionManager; import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.asterix.common.transactions.ITxnIdFactory; +import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.asterix.runtime.job.listener.NodeJobTracker; import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -87,6 +89,7 @@ public class CcApplicationContext implements ICcApplicationContext { private IClusterStateManager clusterStateManager; private final INodeJobTracker nodeJobTracker; private final ITxnIdFactory txnIdFactory; + private final ICompressionManager compressionManager; public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, @@ -121,6 +124,7 @@ public class CcApplicationContext implements ICcApplicationContext { this.resourceIdManager = new ResourceIdManager(clusterStateManager); nodeJobTracker = new NodeJobTracker(); txnIdFactory = new BulkTxnIdFactory(); + compressionManager = new CompressionManager(storageProperties); } @@ -270,7 +274,13 @@ public class CcApplicationContext implements ICcApplicationContext { return NoOpCoordinationService.INSTANCE; } + @Override public ITxnIdFactory getTxnIdFactory() { return txnIdFactory; } + + @Override + public ICompressionManager getCompressionManager() { + return compressionManager; + } }
