This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit c586d8897a971bf6ad9fa04e9b60aada37b9c247 Author: Dmitry Lychagin <[email protected]> AuthorDate: Fri Apr 10 14:14:59 2020 -0700 [NO ISSUE][MTD] Customizable dataset compaction policy in metadata - user model changes: no - storage format changes: no - interface changes: no Details: - Allow product extensions to customize how dataset's compaction policy is stored in the metadata Change-Id: I0216af5eabdf5ff269ba2d3feccf1371d273315b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5224 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Dmitry Lychagin <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> --- .../DatasetTupleTranslator.java | 107 +++++++++++++-------- 1 file changed, 65 insertions(+), 42 deletions(-) diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java index dae6152..74f5076 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java @@ -25,6 +25,7 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; @@ -68,6 +69,7 @@ import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; @@ -135,20 +137,9 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { String nodeGroupName = ((AString) datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX)) .getStringValue(); - String compactionPolicy = ((AString) datasetRecord - .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue(); - IACursor cursor = ((AOrderedList) datasetRecord - .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX)) - .getCursor(); - Map<String, String> compactionPolicyProperties = new LinkedHashMap<>(); - String key; - String value; - while (cursor.next()) { - ARecord field = (ARecord) cursor.get(); - key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue(); - value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue(); - compactionPolicyProperties.put(key, value); - } + + Pair<String, Map<String, String>> compactionPolicy = readCompactionPolicy(datasetType, datasetRecord); + switch (datasetType) { case INTERNAL: { ARecord datasetDetailsRecord = (ARecord) datasetRecord @@ -159,7 +150,7 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { PartitioningStrategy partitioningStrategy = PartitioningStrategy.valueOf(((AString) datasetDetailsRecord .getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONSTRATEGY_FIELD_INDEX)) .getStringValue()); - cursor = ((AOrderedList) datasetDetailsRecord + IACursor cursor = ((AOrderedList) datasetDetailsRecord .getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONKEY_FIELD_INDEX)) .getCursor(); List<List<String>> partitioningKey = new ArrayList<>(); @@ -220,15 +211,15 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { String adapter = ((AString) datasetDetailsRecord .getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_DATASOURCE_ADAPTER_FIELD_INDEX)) .getStringValue(); - cursor = ((AOrderedList) datasetDetailsRecord + IACursor cursor = ((AOrderedList) datasetDetailsRecord .getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX)) .getCursor(); Map<String, String> properties = new HashMap<>(); while (cursor.next()) { ARecord field = (ARecord) cursor.get(); - key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)) + String key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)) .getStringValue(); - value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)) + String value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)) .getStringValue(); properties.put(key, value); } @@ -262,10 +253,34 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { String compressionScheme = getCompressionScheme(datasetRecord); return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName, - nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType, + nodeGroupName, compactionPolicy.first, compactionPolicy.second, datasetDetails, hints, datasetType, datasetId, pendingOp, rebalanceCount, compressionScheme); } + protected Pair<String, Map<String, String>> readCompactionPolicy(DatasetType datasetType, ARecord datasetRecord) { + + String compactionPolicy = ((AString) datasetRecord + .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue(); + AOrderedList compactionPolicyPropertiesList = ((AOrderedList) datasetRecord + .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX)); + + Map<String, String> compactionPolicyProperties; + if (compactionPolicyPropertiesList.size() > 0) { + compactionPolicyProperties = new LinkedHashMap<>(); + for (IACursor cursor = compactionPolicyPropertiesList.getCursor(); cursor.next();) { + ARecord field = (ARecord) cursor.get(); + String key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)) + .getStringValue(); + String value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)) + .getStringValue(); + compactionPolicyProperties.put(key, value); + } + } else { + compactionPolicyProperties = Collections.emptyMap(); + } + return new Pair<>(compactionPolicy, compactionPolicyProperties); + } + private long getRebalanceCount(ARecord datasetRecord) { // Read the rebalance count if there is one. int rebalanceCountIndex = @@ -342,29 +357,9 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { stringSerde.serialize(aString, fieldValue.getDataOutput()); recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX, fieldValue); - // write field 6 - fieldValue.reset(); - aString.setValue(dataset.getCompactionPolicy()); - stringSerde.serialize(aString, fieldValue.getDataOutput()); - recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue); - - // write field 7 - listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE - .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]); - if (dataset.getCompactionPolicyProperties() != null) { - for (Map.Entry<String, String> property : dataset.getCompactionPolicyProperties().entrySet()) { - String name = property.getKey(); - String value = property.getValue(); - itemValue.reset(); - DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(), - MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE); - listBuilder.addItem(itemValue); - } - } - fieldValue.reset(); - listBuilder.write(fieldValue.getDataOutput(), true); - recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX, - fieldValue); + // write field 6/7 + writeCompactionPolicy(dataset.getDatasetType(), dataset.getCompactionPolicy(), + dataset.getCompactionPolicyProperties(), listBuilder, itemValue); // write field 8/9 fieldValue.reset(); @@ -414,6 +409,34 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { return tuple; } + protected void writeCompactionPolicy(DatasetType datasetType, String compactionPolicy, + Map<String, String> compactionPolicyProperties, OrderedListBuilder listBuilder, + ArrayBackedValueStorage itemValue) throws HyracksDataException { + // write field 6 + fieldValue.reset(); + aString.setValue(compactionPolicy); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue); + + // write field 7 + listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE + .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]); + if (compactionPolicyProperties != null && !compactionPolicyProperties.isEmpty()) { + for (Map.Entry<String, String> property : compactionPolicyProperties.entrySet()) { + String name = property.getKey(); + String value = property.getValue(); + itemValue.reset(); + DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(), + MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE); + listBuilder.addItem(itemValue); + } + } + fieldValue.reset(); + listBuilder.write(fieldValue.getDataOutput(), true); + recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX, + fieldValue); + } + /** * Keep protected to allow other extensions to add additional fields *
