This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new a9dfde97b8 [ASTERIXDB-3132][OTH] Add schema structure
a9dfde97b8 is described below
commit a9dfde97b8ee56fbc377b1d8d2e1d0806c32b776
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Thu Mar 9 16:29:57 2023 -0800
[ASTERIXDB-3132][OTH] Add schema structure
- user mode changes: no
- storage format changes: no
- interface changes: no
Details:
Add schema structure, which allows evolvement (or changes)
Change-Id: I15b469ba64b3f4d561aaaff442fc92f71270a1d8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17417
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../metadata/AbstractColumnImmutableMetadata.java | 50 ++
.../AbstractColumnImmutableReadMetadata.java | 37 ++
.../column/metadata/AbstractColumnMetadata.java | 65 +++
.../column/metadata/FieldNamesDictionary.java | 220 ++++++++
.../column/metadata/PathInfoSerializer.java | 94 ++++
.../metadata/schema/AbstractSchemaNestedNode.java | 27 +
.../column/metadata/schema/AbstractSchemaNode.java | 89 ++++
.../column/metadata/schema/ISchemaNodeVisitor.java | 33 ++
.../column/metadata/schema/ObjectSchemaNode.java | 182 +++++++
.../column/metadata/schema/UnionSchemaNode.java | 146 ++++++
.../collection/AbstractCollectionSchemaNode.java | 97 ++++
.../schema/collection/ArraySchemaNode.java | 44 ++
.../schema/collection/MultisetSchemaNode.java | 43 ++
.../schema/primitive/MissingFieldSchemaNode.java | 33 ++
.../schema/primitive/PrimitiveSchemaNode.java | 88 ++++
.../schema/visitor/PathExtractorVisitor.java | 63 +++
.../visitor/SchemaBuilderFromIATypeVisitor.java | 133 +++++
.../operation/lsm/flush/FlushColumnMetadata.java | 572 +++++++++++++++++++++
18 files changed, 2016 insertions(+)
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java
new file mode 100644
index 0000000000..c7b4651bdb
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public abstract class AbstractColumnImmutableMetadata extends
AbstractColumnMetadata {
+ protected final IValueReference serializedMetadata;
+ protected final int numberOfColumns;
+
+ protected AbstractColumnImmutableMetadata(ARecordType datasetType,
ARecordType metaType, int numberOfPrimaryKeys,
+ IValueReference serializedMetadata, int numberOfColumns) {
+ super(datasetType, metaType, numberOfPrimaryKeys);
+ this.serializedMetadata = serializedMetadata;
+ this.numberOfColumns = numberOfColumns;
+ }
+
+ @Override
+ public final IValueReference serializeColumnsMetadata() {
+ return serializedMetadata;
+ }
+
+ @Override
+ public final void abort() throws HyracksDataException {
+ //NoOp as the metadata is immutable
+ }
+
+ @Override
+ public int getNumberOfColumns() {
+ return numberOfColumns;
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
new file mode 100644
index 0000000000..5ac38d7f19
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.data.std.api.IValueReference;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+public abstract class AbstractColumnImmutableReadMetadata extends
AbstractColumnImmutableMetadata
+ implements IColumnProjectionInfo {
+ protected AbstractColumnImmutableReadMetadata(ARecordType datasetType,
ARecordType metaType,
+ int numberOfPrimaryKeys, IValueReference serializedMetadata, int
numberOfColumns) {
+ super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata,
numberOfColumns);
+ }
+
+ /**
+ * @return the corresponding reader (merge reader or query reader) given
<code>this</code> metadata
+ */
+ public abstract AbstractColumnTupleReader createTupleReader();
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
new file mode 100644
index 0000000000..4e19cbc188
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+
+public abstract class AbstractColumnMetadata implements IColumnMetadata {
+ protected static final int WRITERS_POINTER = 0;
+ protected static final int FIELD_NAMES_POINTER = WRITERS_POINTER +
Integer.BYTES;
+ protected static final int SCHEMA_POINTER = FIELD_NAMES_POINTER +
Integer.BYTES;
+ protected static final int META_SCHEMA_POINTER = SCHEMA_POINTER +
Integer.BYTES;
+ protected static final int PATH_INFO_POINTER = META_SCHEMA_POINTER +
Integer.BYTES;
+ protected static final int OFFSETS_SIZE = PATH_INFO_POINTER +
Integer.BYTES;
+ private final ARecordType datasetType;
+ private final ARecordType metaType;
+
+ private final int numberOfPrimaryKeys;
+ private final int recordFieldIndex;
+
+ protected AbstractColumnMetadata(ARecordType datasetType, ARecordType
metaType, int numberOfPrimaryKeys) {
+ this.datasetType = datasetType;
+ this.metaType = metaType;
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+ this.recordFieldIndex = numberOfPrimaryKeys;
+ }
+
+ public final ARecordType getDatasetType() {
+ return datasetType;
+ }
+
+ public final ARecordType getMetaType() {
+ return metaType;
+ }
+
+ public final int getNumberOfPrimaryKeys() {
+ return numberOfPrimaryKeys;
+ }
+
+ public final int getRecordFieldIndex() {
+ return recordFieldIndex;
+ }
+
+ public final int getMetaRecordFieldIndex() {
+ return recordFieldIndex + 1;
+ }
+
+ public abstract int getNumberOfColumns();
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
new file mode 100644
index 0000000000..aa2e19464f
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
+public class FieldNamesDictionary {
+ //For both declared and inferred fields
+ private final List<IValueReference> fieldNames;
+ private final Object2IntMap<String> declaredFieldNamesToIndexMap;
+ private final Int2IntMap hashToFieldNameIndexMap;
+ private final IBinaryHashFunction fieldNameHashFunction;
+
+ //For declared fields
+ private final AMutableString mutableString;
+ private final AStringSerializerDeserializer stringSerDer;
+
+ //For lookups
+ private final ArrayBackedValueStorage lookupStorage;
+
+ public FieldNamesDictionary() {
+ this(new ArrayList<>(), new Object2IntOpenHashMap<>(), new
Int2IntOpenHashMap());
+ }
+
+ private FieldNamesDictionary(List<IValueReference> fieldNames,
Object2IntMap<String> declaredFieldNamesToIndexMap,
+ Int2IntMap hashToFieldNameIndexMap) {
+ this.fieldNames = fieldNames;
+ this.declaredFieldNamesToIndexMap = declaredFieldNamesToIndexMap;
+ this.hashToFieldNameIndexMap = hashToFieldNameIndexMap;
+
+ mutableString = new AMutableString("");
+ stringSerDer = new AStringSerializerDeserializer(new
UTF8StringWriter(), new UTF8StringReader());
+ fieldNameHashFunction =
+ new
PointableBinaryHashFunctionFactory(UTF8StringPointable.FACTORY).createBinaryHashFunction();
+ lookupStorage = new ArrayBackedValueStorage();
+ }
+
+ public List<IValueReference> getFieldNames() {
+ return fieldNames;
+ }
+
+ //TODO solve collision (they're so rare that I haven't seen any)
+ public int getOrCreateFieldNameIndex(IValueReference fieldName) throws
HyracksDataException {
+ int hash = getHash(fieldName);
+ if (!hashToFieldNameIndexMap.containsKey(hash)) {
+ int index = addFieldName(creatFieldName(fieldName), hash);
+ hashToFieldNameIndexMap.put(hash, index);
+ return index;
+ }
+ return hashToFieldNameIndexMap.get(hash);
+ }
+
+ public int getOrCreateFieldNameIndex(String fieldName) throws
HyracksDataException {
+ if (!declaredFieldNamesToIndexMap.containsKey(fieldName)) {
+ IValueReference serializedFieldName = creatFieldName(fieldName);
+ int hash = getHash(serializedFieldName);
+ int index = addFieldName(serializedFieldName, hash);
+ declaredFieldNamesToIndexMap.put(fieldName, index);
+ return index;
+ }
+ return declaredFieldNamesToIndexMap.getInt(fieldName);
+ }
+
+ public int getFieldNameIndex(String fieldName) throws HyracksDataException
{
+ lookupStorage.reset();
+ serializeFieldName(fieldName, lookupStorage);
+ return hashToFieldNameIndexMap.getOrDefault(getHash(lookupStorage),
-1);
+ }
+
+ private ArrayBackedValueStorage creatFieldName(IValueReference fieldName)
throws HyracksDataException {
+ ArrayBackedValueStorage copy = new
ArrayBackedValueStorage(fieldName.getLength());
+ copy.append(fieldName);
+ return copy;
+ }
+
+ private ArrayBackedValueStorage creatFieldName(String fieldName) throws
HyracksDataException {
+ ArrayBackedValueStorage serializedFieldName = new
ArrayBackedValueStorage();
+ serializeFieldName(fieldName, serializedFieldName);
+ return serializedFieldName;
+ }
+
+ private void serializeFieldName(String fieldName, ArrayBackedValueStorage
storage) throws HyracksDataException {
+ mutableString.setValue(fieldName);
+ stringSerDer.serialize(mutableString, storage.getDataOutput());
+ }
+
+ private int getHash(IValueReference fieldName) throws HyracksDataException
{
+ byte[] object = fieldName.getByteArray();
+ int start = fieldName.getStartOffset();
+ int length = fieldName.getLength();
+
+ return fieldNameHashFunction.hash(object, start, length);
+ }
+
+ private int addFieldName(IValueReference fieldName, int hash) {
+ int index = fieldNames.size();
+ hashToFieldNameIndexMap.put(hash, index);
+ fieldNames.add(fieldName);
+ return index;
+ }
+
+ public IValueReference getFieldName(int index) {
+ return fieldNames.get(index);
+ }
+
+ public void serialize(DataOutput output) throws IOException {
+ output.writeInt(fieldNames.size());
+ for (IValueReference fieldName : fieldNames) {
+ output.writeInt(fieldName.getLength());
+ output.write(fieldName.getByteArray(), fieldName.getStartOffset(),
fieldName.getLength());
+ }
+
+ output.writeInt(declaredFieldNamesToIndexMap.size());
+ for (Object2IntMap.Entry<String> declaredFieldIndex :
declaredFieldNamesToIndexMap.object2IntEntrySet()) {
+ output.writeUTF(declaredFieldIndex.getKey());
+ output.writeInt(declaredFieldIndex.getIntValue());
+ }
+
+ for (Int2IntMap.Entry hashIndex :
hashToFieldNameIndexMap.int2IntEntrySet()) {
+ output.writeInt(hashIndex.getIntKey());
+ output.writeInt(hashIndex.getIntValue());
+ }
+ }
+
+ public static FieldNamesDictionary deserialize(DataInput input) throws
IOException {
+ int numberOfFieldNames = input.readInt();
+
+ List<IValueReference> fieldNames = new ArrayList<>();
+ deserializeFieldNames(input, fieldNames, numberOfFieldNames);
+
+ Object2IntMap<String> declaredFieldNamesToIndexMap = new
Object2IntOpenHashMap<>();
+ deserializeDeclaredFieldNames(input, declaredFieldNamesToIndexMap);
+
+ Int2IntMap hashToFieldNameIndexMap = new Int2IntOpenHashMap();
+ deserializeHashToFieldNameIndex(input, hashToFieldNameIndexMap,
numberOfFieldNames);
+
+ return new FieldNamesDictionary(fieldNames,
declaredFieldNamesToIndexMap, hashToFieldNameIndexMap);
+ }
+
+ public void abort(DataInputStream input) throws IOException {
+ int numberOfFieldNames = input.readInt();
+
+ fieldNames.clear();
+ deserializeFieldNames(input, fieldNames, numberOfFieldNames);
+
+ declaredFieldNamesToIndexMap.clear();
+ deserializeDeclaredFieldNames(input, declaredFieldNamesToIndexMap);
+
+ hashToFieldNameIndexMap.clear();
+ deserializeHashToFieldNameIndex(input, hashToFieldNameIndexMap,
numberOfFieldNames);
+ }
+
+ private static void deserializeFieldNames(DataInput input,
List<IValueReference> fieldNames, int numberOfFieldNames)
+ throws IOException {
+
+ for (int i = 0; i < numberOfFieldNames; i++) {
+ int length = input.readInt();
+ ArrayBackedValueStorage fieldName = new
ArrayBackedValueStorage(length);
+ fieldName.setSize(length);
+ input.readFully(fieldName.getByteArray(), 0, length);
+ fieldNames.add(fieldName);
+ }
+ }
+
+ private static void deserializeDeclaredFieldNames(DataInput input,
+ Object2IntMap<String> declaredFieldNamesToIndexMap) throws
IOException {
+ int numberOfDeclaredFieldNames = input.readInt();
+ for (int i = 0; i < numberOfDeclaredFieldNames; i++) {
+ String fieldName = input.readUTF();
+ int fieldNameIndex = input.readInt();
+ declaredFieldNamesToIndexMap.put(fieldName, fieldNameIndex);
+ }
+ }
+
+ private static void deserializeHashToFieldNameIndex(DataInput input,
Int2IntMap hashToFieldNameIndexMap,
+ int numberOfFieldNames) throws IOException {
+ for (int i = 0; i < numberOfFieldNames; i++) {
+ int hash = input.readInt();
+ int fieldNameIndex = input.readInt();
+ hashToFieldNameIndexMap.put(hash, fieldNameIndex);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java
new file mode 100644
index 0000000000..f72b77bb94
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public class PathInfoSerializer {
+ private final ArrayBackedValueStorage primaryKeyOutputPathStorage;
+ private final ArrayBackedValueStorage pathOutputStorage;
+ private final IntList delimiters;
+ private int level;
+
+ public PathInfoSerializer() {
+ primaryKeyOutputPathStorage = new ArrayBackedValueStorage();
+ pathOutputStorage = new ArrayBackedValueStorage();
+ delimiters = new IntArrayList();
+ level = 0;
+ }
+
+ public void reset() {
+ primaryKeyOutputPathStorage.reset();
+ pathOutputStorage.reset();
+ }
+
+ public void enter(AbstractSchemaNestedNode nestedNode) {
+ if (nestedNode.isCollection()) {
+ delimiters.add(0, level - 1);
+ }
+ if (nestedNode.isObjectOrCollection()) {
+ level++;
+ }
+ }
+
+ public void exit(AbstractSchemaNestedNode nestedNode) {
+ if (nestedNode.isCollection()) {
+ delimiters.removeInt(0);
+ }
+ if (nestedNode.isObjectOrCollection()) {
+ level--;
+ }
+ }
+
+ public void writePathInfo(ATypeTag typeTag, int columnIndex, boolean
primaryKey) throws IOException {
+ DataOutput output =
+ primaryKey ? primaryKeyOutputPathStorage.getDataOutput() :
pathOutputStorage.getDataOutput();
+ //type tag
+ output.write(typeTag.serialize());
+ //columnIndex
+ output.writeInt(columnIndex);
+ //maxLevel
+ output.writeInt(level);
+ //is primary key
+ output.writeBoolean(primaryKey);
+ //Is collection
+ boolean collection = !delimiters.isEmpty();
+ output.writeBoolean(collection);
+ if (collection) {
+ output.writeInt(delimiters.size());
+ for (int i = 0; i < delimiters.size(); i++) {
+ output.writeInt(delimiters.getInt(i));
+ }
+ }
+ }
+
+ public void serialize(DataOutput output, int numberOfColumns) throws
IOException {
+ output.writeInt(numberOfColumns);
+ output.write(primaryKeyOutputPathStorage.getByteArray(), 0,
primaryKeyOutputPathStorage.getLength());
+ output.write(pathOutputStorage.getByteArray(), 0,
pathOutputStorage.getLength());
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
new file mode 100644
index 0000000000..187e4600f6
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+public abstract class AbstractSchemaNestedNode extends AbstractSchemaNode {
+
+ @Override
+ public final boolean isNested() {
+ return true;
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
new file mode 100644
index 0000000000..c9d8635d97
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode;
+import
org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractSchemaNode {
+ private int counter;
+
+ public abstract ATypeTag getTypeTag();
+
+ public abstract boolean isNested();
+
+ public abstract boolean isObjectOrCollection();
+
+ public abstract boolean isCollection();
+
+ public final void incrementCounter() {
+ counter++;
+ }
+
+ public final void setCounter(int counter) {
+ this.counter = counter;
+ }
+
+ public final int getCounter() {
+ return counter;
+ }
+
+ public abstract <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg)
throws HyracksDataException;
+
+ public abstract void serialize(DataOutput output, PathInfoSerializer
pathInfoSerializer) throws IOException;
+
+ public static AbstractSchemaNode deserialize(DataInput input,
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
throws IOException {
+ ATypeTag typeTag = ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
+ switch (typeTag) {
+ case SYSTEM_NULL:
+ return MissingFieldSchemaNode.INSTANCE;
+ case OBJECT:
+ return new ObjectSchemaNode(input, definitionLevels);
+ case ARRAY:
+ return new ArraySchemaNode(input, definitionLevels);
+ case MULTISET:
+ return new MultisetSchemaNode(input, definitionLevels);
+ case UNION:
+ return new UnionSchemaNode(input, definitionLevels);
+ case NULL:
+ case MISSING:
+ case BOOLEAN:
+ case BIGINT:
+ case DOUBLE:
+ case STRING:
+ case UUID:
+ return new PrimitiveSchemaNode(typeTag, input);
+ default:
+ throw new UnsupportedEncodingException(typeTag + " is not
supported");
+ }
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java
new file mode 100644
index 0000000000..4d3815667c
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import
org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISchemaNodeVisitor<R, T> {
+ R visit(ObjectSchemaNode objectNode, T arg) throws HyracksDataException;
+
+ R visit(AbstractCollectionSchemaNode collectionNode, T arg) throws
HyracksDataException;
+
+ R visit(UnionSchemaNode unionNode, T arg) throws HyracksDataException;
+
+ R visit(PrimitiveSchemaNode primitiveNode, T arg) throws
HyracksDataException;
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
new file mode 100644
index 0000000000..a230e864e1
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import
org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.util.annotations.CriticalPath;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap.Entry;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntImmutableList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public final class ObjectSchemaNode extends AbstractSchemaNestedNode {
+ private final Int2IntMap fieldNameIndexToChildIndexMap;
+ private final List<AbstractSchemaNode> children;
+
+ public ObjectSchemaNode() {
+ fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
+ children = new ArrayList<>();
+ }
+
+ ObjectSchemaNode(DataInput input, Map<AbstractSchemaNestedNode,
RunLengthIntArray> definitionLevels)
+ throws IOException {
+ if (definitionLevels != null) {
+ definitionLevels.put(this, new RunLengthIntArray());
+ }
+ int numberOfChildren = input.readInt();
+
+ fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
+ deserializeFieldNameIndexToChildIndex(input,
fieldNameIndexToChildIndexMap, numberOfChildren);
+
+ children = new ArrayList<>();
+ deserializeChildren(input, children, numberOfChildren,
definitionLevels);
+ }
+
+ public AbstractSchemaNode getOrCreateChild(IValueReference fieldName,
ATypeTag childTypeTag,
+ FlushColumnMetadata columnMetadata) throws HyracksDataException {
+ int numberOfChildren = children.size();
+ int fieldNameIndex =
columnMetadata.getFieldNamesDictionary().getOrCreateFieldNameIndex(fieldName);
+ int childIndex =
fieldNameIndexToChildIndexMap.getOrDefault(fieldNameIndex, numberOfChildren);
+ AbstractSchemaNode currentChild = childIndex == numberOfChildren ?
null : children.get(childIndex);
+ AbstractSchemaNode newChild =
columnMetadata.getOrCreateChild(currentChild, childTypeTag);
+ if (currentChild == null) {
+ children.add(childIndex, newChild);
+ fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+ } else if (currentChild != newChild) {
+ children.set(childIndex, newChild);
+ }
+
+ return newChild;
+ }
+
+ public void addChild(int fieldNameIndex, AbstractSchemaNode child) {
+ int childIndex = children.size();
+ fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+ children.add(child);
+ }
+
+ public AbstractSchemaNode getChild(int fieldNameIndex) {
+ if (fieldNameIndexToChildIndexMap.containsKey(fieldNameIndex)) {
+ return
children.get(fieldNameIndexToChildIndexMap.get(fieldNameIndex));
+ }
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+
+ public void removeChild(int fieldNameIndex) {
+ int childIndex = fieldNameIndexToChildIndexMap.remove(fieldNameIndex);
+ children.remove(childIndex);
+ }
+
+ public List<AbstractSchemaNode> getChildren() {
+ return children;
+ }
+
+ /**
+ * Should not be used in a {@link CriticalPath}
+ */
+ public IntList getChildrenFieldNameIndexes() {
+ return
IntImmutableList.toList(fieldNameIndexToChildIndexMap.int2IntEntrySet().stream()
+
.sorted(Comparator.comparingInt(Entry::getIntValue)).mapToInt(Entry::getIntKey));
+ }
+
+ public boolean containsField(int fieldNameIndex) {
+ return fieldNameIndexToChildIndexMap.containsKey(fieldNameIndex);
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.OBJECT;
+ }
+
+ @Override
+ public boolean isObjectOrCollection() {
+ return true;
+ }
+
+ @Override
+ public boolean isCollection() {
+ return false;
+ }
+
+ @Override
+ public <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws
HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public void serialize(DataOutput output, PathInfoSerializer
pathInfoSerializer) throws IOException {
+ output.write(ATypeTag.OBJECT.serialize());
+ output.writeInt(children.size());
+ for (Int2IntMap.Entry fieldNameIndexChildIndex :
fieldNameIndexToChildIndexMap.int2IntEntrySet()) {
+ output.writeInt(fieldNameIndexChildIndex.getIntKey());
+ output.writeInt(fieldNameIndexChildIndex.getIntValue());
+ }
+ pathInfoSerializer.enter(this);
+ for (AbstractSchemaNode child : children) {
+ child.serialize(output, pathInfoSerializer);
+ }
+ pathInfoSerializer.exit(this);
+ }
+
+ public void abort(DataInputStream input, Map<AbstractSchemaNestedNode,
RunLengthIntArray> definitionLevels)
+ throws IOException {
+ definitionLevels.put(this, new RunLengthIntArray());
+
+ int numberOfChildren = input.readInt();
+
+ fieldNameIndexToChildIndexMap.clear();
+ deserializeFieldNameIndexToChildIndex(input,
fieldNameIndexToChildIndexMap, numberOfChildren);
+
+ children.clear();
+ deserializeChildren(input, children, numberOfChildren,
definitionLevels);
+ }
+
+ private static void deserializeFieldNameIndexToChildIndex(DataInput input,
Int2IntMap fieldNameIndexToChildIndexMap,
+ int numberOfChildren) throws IOException {
+ for (int i = 0; i < numberOfChildren; i++) {
+ int fieldNameIndex = input.readInt();
+ int childIndex = input.readInt();
+ fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+ }
+ }
+
+ private static void deserializeChildren(DataInput input,
List<AbstractSchemaNode> children, int numberOfChildren,
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
throws IOException {
+ for (int i = 0; i < numberOfChildren; i++) {
+ children.add(AbstractSchemaNode.deserialize(input,
definitionLevels));
+ }
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
new file mode 100644
index 0000000000..4c067bd5f9
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import
org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class UnionSchemaNode extends AbstractSchemaNestedNode {
+ private final AbstractSchemaNode originalType;
+ private final Map<ATypeTag, AbstractSchemaNode> children;
+
+ public UnionSchemaNode(AbstractSchemaNode child1, AbstractSchemaNode
child2) {
+ children = new EnumMap<>(ATypeTag.class);
+ originalType = child1;
+ putChild(child1);
+ putChild(child2);
+ }
+
+ UnionSchemaNode(DataInput input, Map<AbstractSchemaNestedNode,
RunLengthIntArray> definitionLevels)
+ throws IOException {
+ if (definitionLevels != null) {
+ definitionLevels.put(this, new RunLengthIntArray());
+ }
+ ATypeTag originalTypeTag =
ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
+ int numberOfChildren = input.readInt();
+ children = new EnumMap<>(ATypeTag.class);
+ for (int i = 0; i < numberOfChildren; i++) {
+ AbstractSchemaNode child = AbstractSchemaNode.deserialize(input,
definitionLevels);
+ children.put(child.getTypeTag(), child);
+ }
+ originalType = children.get(originalTypeTag);
+ }
+
+ private void putChild(AbstractSchemaNode child) {
+ children.put(child.getTypeTag(), child);
+ }
+
+ public AbstractSchemaNode getOriginalType() {
+ return originalType;
+ }
+
+ public AbstractSchemaNode getOrCreateChild(ATypeTag childTypeTag,
FlushColumnMetadata columnMetadata)
+ throws HyracksDataException {
+ ATypeTag normalizedTypeTag =
FlushColumnMetadata.getNormalizedTypeTag(childTypeTag);
+ AbstractSchemaNode currentChild = children.get(normalizedTypeTag);
+ //The parent of a union child should be the actual parent
+ AbstractSchemaNode newChild =
columnMetadata.getOrCreateChild(currentChild, normalizedTypeTag);
+ if (currentChild != newChild) {
+ putChild(newChild);
+ }
+ return newChild;
+ }
+
+ public AbstractSchemaNode getChild(ATypeTag typeTag) {
+ return children.getOrDefault(typeTag, MissingFieldSchemaNode.INSTANCE);
+ }
+
+ public Map<ATypeTag, AbstractSchemaNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public boolean isObjectOrCollection() {
+ return false;
+ }
+
+ @Override
+ public boolean isCollection() {
+ return false;
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.UNION;
+ }
+
+ @Override
+ public <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws
HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public void serialize(DataOutput output, PathInfoSerializer
pathInfoSerializer) throws IOException {
+ output.write(ATypeTag.UNION.serialize());
+ output.writeByte(originalType.getTypeTag().serialize());
+ output.writeInt(children.size());
+ pathInfoSerializer.enter(this);
+ for (AbstractSchemaNode child : children.values()) {
+ child.serialize(output, pathInfoSerializer);
+ }
+ pathInfoSerializer.exit(this);
+ }
+
+ /**
+ * This would return any numeric node
+ *
+ * @return first numeric node or missing node
+ * @see org.apache.asterix.column.operation.query.SchemaClipperVisitor
+ */
+ public AbstractSchemaNode getNumericChildOrMissing() {
+ for (AbstractSchemaNode node : children.values()) {
+ if (ATypeHierarchy.getTypeDomain(node.getTypeTag()) ==
ATypeHierarchy.Domain.NUMERIC) {
+ return node;
+ }
+ }
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+
+ public int getNumberOfNumericChildren() {
+ int counter = 0;
+ for (AbstractSchemaNode node : children.values()) {
+ if (ATypeHierarchy.getTypeDomain(node.getTypeTag()) ==
ATypeHierarchy.Domain.NUMERIC) {
+ counter++;
+ }
+ }
+
+ return counter;
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
new file mode 100644
index 0000000000..8455864ea4
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractCollectionSchemaNode extends
AbstractSchemaNestedNode {
+ private AbstractSchemaNode item;
+
+ AbstractCollectionSchemaNode() {
+ item = null;
+ }
+
+ AbstractCollectionSchemaNode(DataInput input,
Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+ throws IOException {
+ if (definitionLevels != null) {
+ definitionLevels.put(this, new RunLengthIntArray());
+ }
+ item = AbstractSchemaNode.deserialize(input, definitionLevels);
+ }
+
+ public final AbstractSchemaNode getOrCreateItem(ATypeTag childTypeTag,
FlushColumnMetadata columnMetadata)
+ throws HyracksDataException {
+ AbstractSchemaNode newItem = columnMetadata.getOrCreateChild(item,
childTypeTag);
+ if (newItem != item) {
+ item = newItem;
+ }
+ return item;
+ }
+
+ public final AbstractSchemaNode getItemNode() {
+ return item;
+ }
+
+ public final void setItemNode(AbstractSchemaNode item) {
+ this.item = item;
+ }
+
+ @Override
+ public final <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg)
throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public final boolean isObjectOrCollection() {
+ return true;
+ }
+
+ @Override
+ public final boolean isCollection() {
+ return true;
+ }
+
+ @Override
+ public final void serialize(DataOutput output, PathInfoSerializer
pathInfoSerializer) throws IOException {
+ output.write(getTypeTag().serialize());
+ pathInfoSerializer.enter(this);
+ item.serialize(output, pathInfoSerializer);
+ pathInfoSerializer.exit(this);
+ }
+
+ public static AbstractCollectionSchemaNode create(ATypeTag typeTag) {
+ if (typeTag == ATypeTag.ARRAY) {
+ return new ArraySchemaNode();
+ }
+
+ return new MultisetSchemaNode();
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java
new file mode 100644
index 0000000000..084a434d3b
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class ArraySchemaNode extends AbstractCollectionSchemaNode {
+
+ public ArraySchemaNode() {
+ super();
+ }
+
+ public ArraySchemaNode(DataInput input, Map<AbstractSchemaNestedNode,
RunLengthIntArray> definitionLevels)
+ throws IOException {
+ super(input, definitionLevels);
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.ARRAY;
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java
new file mode 100644
index 0000000000..af27a5aa24
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class MultisetSchemaNode extends AbstractCollectionSchemaNode {
+ public MultisetSchemaNode() {
+ super();
+ }
+
+ public MultisetSchemaNode(DataInput input, Map<AbstractSchemaNestedNode,
RunLengthIntArray> definitionLevels)
+ throws IOException {
+ super(input, definitionLevels);
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.MULTISET;
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java
new file mode 100644
index 0000000000..98f408e7fd
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.primitive;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.om.types.ATypeTag;
+
+/**
+ * A special schema node a non-existing object or union field
+ */
+public final class MissingFieldSchemaNode extends PrimitiveSchemaNode {
+ public static final AbstractSchemaNode INSTANCE = new
MissingFieldSchemaNode();
+
+ private MissingFieldSchemaNode() {
+ super(-1, ATypeTag.MISSING, false);
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
new file mode 100644
index 0000000000..28d379d51b
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.primitive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PrimitiveSchemaNode extends AbstractSchemaNode {
+ private final int columnIndex;
+ private final ATypeTag typeTag;
+ private final boolean primaryKey;
+
+ public PrimitiveSchemaNode(int columnIndex, ATypeTag typeTag, boolean
primaryKey) {
+ this.columnIndex = columnIndex;
+ this.typeTag = typeTag;
+ this.primaryKey = primaryKey;
+ }
+
+ public PrimitiveSchemaNode(ATypeTag typeTag, DataInput input) throws
IOException {
+ this.typeTag = typeTag;
+ columnIndex = input.readInt();
+ primaryKey = input.readBoolean();
+ }
+
+ public final int getColumnIndex() {
+ return columnIndex;
+ }
+
+ @Override
+ public final ATypeTag getTypeTag() {
+ return typeTag;
+ }
+
+ @Override
+ public final boolean isNested() {
+ return false;
+ }
+
+ @Override
+ public final boolean isObjectOrCollection() {
+ return false;
+ }
+
+ @Override
+ public final boolean isCollection() {
+ return false;
+ }
+
+ public final boolean isPrimaryKey() {
+ return primaryKey;
+ }
+
+ @Override
+ public final <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg)
throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public void serialize(DataOutput output, PathInfoSerializer
pathInfoSerializer) throws IOException {
+ output.write(typeTag.serialize());
+ output.writeInt(columnIndex);
+ output.writeBoolean(primaryKey);
+ pathInfoSerializer.writePathInfo(typeTag, columnIndex, primaryKey);
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java
new file mode 100644
index 0000000000..2917074393
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.visitor;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import
org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import
org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PathExtractorVisitor implements
ISchemaNodeVisitor<AbstractSchemaNode, Void> {
+ @Override
+ public AbstractSchemaNode visit(ObjectSchemaNode objectNode, Void arg)
throws HyracksDataException {
+ int fieldNameIndex =
objectNode.getChildrenFieldNameIndexes().getInt(0);
+ if (fieldNameIndex < 0) {
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+ return objectNode.getChild(fieldNameIndex).accept(this, null);
+ }
+
+ @Override
+ public AbstractSchemaNode visit(AbstractCollectionSchemaNode
collectionNode, Void arg) throws HyracksDataException {
+ AbstractSchemaNode itemNode = collectionNode.getItemNode();
+ if (itemNode == null) {
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+ return collectionNode.getItemNode().accept(this, null);
+ }
+
+ @Override
+ public AbstractSchemaNode visit(UnionSchemaNode unionNode, Void arg)
throws HyracksDataException {
+ for (AbstractSchemaNode node : unionNode.getChildren().values()) {
+ // Using 'for-loop' is the only get the child out of a collection
+ return node.accept(this, null);
+ }
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+
+ @Override
+ public AbstractSchemaNode visit(PrimitiveSchemaNode primitiveNode, Void
arg) throws HyracksDataException {
+ //Missing column index is -1
+ return primitiveNode;
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
new file mode 100644
index 0000000000..fb098faa1f
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.visitor;
+
+import java.util.List;
+
+import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class SchemaBuilderFromIATypeVisitor implements IATypeVisitor<Void,
AbstractSchemaNode> {
+ private final FlushColumnMetadata columnMetadata;
+ private final List<List<String>> primaryKeys;
+ private List<String> currentPrimaryKeyPath;
+ private int processedPrimaryKeys;
+ private int currentPathIndex;
+
+ public SchemaBuilderFromIATypeVisitor(FlushColumnMetadata columnMetadata,
List<List<String>> primaryKeys) {
+ this.columnMetadata = columnMetadata;
+ this.primaryKeys = primaryKeys;
+ processedPrimaryKeys = 0;
+ }
+
+ @Override
+ public Void visit(ARecordType recordType, AbstractSchemaNode arg) {
+ ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
+ columnMetadata.enterLevel(objectNode);
+ try {
+ if (processedPrimaryKeys < primaryKeys.size()) {
+ processPrimaryKeys(recordType, objectNode);
+ }
+ for (int i = 0; i < recordType.getFieldTypes().length; i++) {
+ processField(i, recordType, objectNode);
+ }
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ columnMetadata.exitLevel(objectNode);
+ return null;
+ }
+
+ @Override
+ public Void visit(AbstractCollectionType collectionType,
AbstractSchemaNode arg) {
+ ArraySchemaNode collectionNode = (ArraySchemaNode) arg;
+ IAType itemType = collectionType.getItemType();
+ columnMetadata.enterLevel(collectionNode);
+ try {
+ AbstractSchemaNode itemNode =
collectionNode.getOrCreateItem(itemType.getTypeTag(), columnMetadata);
+ itemType.accept(this, itemNode);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ columnMetadata.exitLevel(collectionNode);
+ return null;
+ }
+
+ @Override
+ public Void visit(AUnionType unionType, AbstractSchemaNode arg) {
+ throw new IllegalStateException(unionType.getTypeTag() + " is not a
declared type");
+ }
+
+ @Override
+ public Void visitFlat(IAType flatType, AbstractSchemaNode arg) {
+ if (processedPrimaryKeys < primaryKeys.size()) {
+ processedPrimaryKeys++;
+ }
+ return null;
+ }
+
+ /*
+ * **************************************************************
+ * Handling primary keys and record fields conversion
+ * **************************************************************
+ */
+ private void processPrimaryKeys(ARecordType recordType, ObjectSchemaNode
objectNode) throws HyracksDataException {
+ if (objectNode == columnMetadata.getRoot() || objectNode ==
columnMetadata.getMetaRoot()) {
+ while (processedPrimaryKeys < primaryKeys.size()) {
+ currentPrimaryKeyPath = primaryKeys.get(processedPrimaryKeys);
+ currentPathIndex = 0;
+ processPrimaryKeyPath(recordType, objectNode);
+ }
+ } else {
+ currentPathIndex++;
+ processPrimaryKeyPath(recordType, objectNode);
+ }
+ }
+
+ private void processPrimaryKeyPath(ARecordType recordType,
ObjectSchemaNode objectNode)
+ throws HyracksDataException {
+ int fieldIndex =
recordType.getFieldIndex(currentPrimaryKeyPath.get(currentPathIndex));
+ processField(fieldIndex, recordType, objectNode);
+ }
+
+ private void processField(int fieldIndex, ARecordType recordType,
ObjectSchemaNode objectNode)
+ throws HyracksDataException {
+ IAType[] fieldTypes = recordType.getFieldTypes();
+ String[] fieldNames = recordType.getFieldNames();
+ FieldNamesDictionary dictionary =
columnMetadata.getFieldNamesDictionary();
+
+ int fieldNameIndex =
dictionary.getOrCreateFieldNameIndex(fieldNames[fieldIndex]);
+ IValueReference fieldName = dictionary.getFieldName(fieldNameIndex);
+
+ IAType fieldType = fieldTypes[fieldIndex];
+ AbstractSchemaNode child = objectNode.getOrCreateChild(fieldName,
fieldType.getTypeTag(), columnMetadata);
+
+ fieldType.accept(this, child);
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
new file mode 100644
index 0000000000..8cd1e98e14
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.AbstractColumnMetadata;
+import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import
org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import
org.apache.asterix.column.metadata.schema.visitor.SchemaBuilderFromIATypeVisitor;
+import org.apache.asterix.column.util.ColumnValuesUtil;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.column.values.writer.AbstractColumnValuesWriter;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+/**
+ * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent}
+ * The schema here is mutable and can change according to the flushed records
+ */
+public final class FlushColumnMetadata extends AbstractColumnMetadata {
+ private final Map<AbstractSchemaNestedNode, RunLengthIntArray>
definitionLevels;
+ private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+ private final FieldNamesDictionary fieldNamesDictionary;
+ private final ObjectSchemaNode root;
+ private final ObjectSchemaNode metaRoot;
+ private final IColumnValuesWriterFactory columnWriterFactory;
+ private final List<IColumnValuesWriter> columnWriters;
+ private final ArrayBackedValueStorage serializedMetadata;
+ private final PathInfoSerializer pathInfoSerializer;
+ private final IntArrayList nullWriterIndexes;
+ private final boolean metaContainsKeys;
+ private boolean changed;
+ private int level;
+ private int repeated;
+
+ public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType,
List<List<String>> primaryKeys,
+ List<Integer> keySourceIndicator, IColumnValuesWriterFactory
columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef) throws
HyracksDataException {
+ super(datasetType, metaType, primaryKeys.size());
+ this.multiPageOpRef = multiPageOpRef;
+ this.columnWriterFactory = columnWriterFactory;
+ definitionLevels = new HashMap<>();
+ columnWriters = new ArrayList<>();
+ level = -1;
+ repeated = 0;
+ fieldNamesDictionary = new FieldNamesDictionary();
+ root = new ObjectSchemaNode();
+ metaRoot = metaType != null ? new ObjectSchemaNode() : null;
+ pathInfoSerializer = new PathInfoSerializer();
+ nullWriterIndexes = new IntArrayList();
+ //Add definition levels for the root
+ addDefinitionLevelsAndGet(root);
+ SchemaBuilderFromIATypeVisitor builder = new
SchemaBuilderFromIATypeVisitor(this, primaryKeys);
+ //Ensure all primary keys take the first column indexes
+ metaContainsKeys = metaType != null && keySourceIndicator.get(0) == 1;
+ if (metaContainsKeys) {
+ addDefinitionLevelsAndGet(metaRoot);
+ metaType.accept(builder, metaRoot);
+ datasetType.accept(builder, root);
+ } else {
+ datasetType.accept(builder, root);
+ if (metaRoot != null) {
+ addDefinitionLevelsAndGet(metaRoot);
+ metaType.accept(builder, metaRoot);
+ }
+ }
+
+ serializedMetadata = new ArrayBackedValueStorage();
+ changed = true;
+ serializeColumnsMetadata();
+ }
+
+ private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType,
List<List<String>> primaryKeys,
+ boolean metaContainsKeys, IColumnValuesWriterFactory
columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
List<IColumnValuesWriter> columnWriters,
+ FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root,
ObjectSchemaNode metaRoot,
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
+ ArrayBackedValueStorage serializedMetadata) {
+ super(datasetType, metaType, primaryKeys.size());
+ this.multiPageOpRef = multiPageOpRef;
+ this.columnWriterFactory = columnWriterFactory;
+ this.definitionLevels = definitionLevels;
+ this.columnWriters = columnWriters;
+ level = -1;
+ repeated = 0;
+ this.fieldNamesDictionary = fieldNamesDictionary;
+ this.root = root;
+ this.metaRoot = metaRoot;
+ this.metaContainsKeys = metaContainsKeys;
+ pathInfoSerializer = new PathInfoSerializer();
+ nullWriterIndexes = new IntArrayList();
+ //Add definition levels for the root
+ addDefinitionLevelsAndGet(root);
+ this.serializedMetadata = serializedMetadata;
+ changed = false;
+ }
+
+ public FieldNamesDictionary getFieldNamesDictionary() {
+ return fieldNamesDictionary;
+ }
+
+ public ObjectSchemaNode getRoot() {
+ return root;
+ }
+
+ public ObjectSchemaNode getMetaRoot() {
+ return metaRoot;
+ }
+
+ public Mutable<IColumnWriteMultiPageOp> getMultiPageOpRef() {
+ return multiPageOpRef;
+ }
+
+ @Override
+ public IValueReference serializeColumnsMetadata() throws
HyracksDataException {
+ if (changed) {
+ try {
+ serializeChanges();
+ changed = false;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ return serializedMetadata;
+ }
+
+ private void serializeChanges() throws IOException {
+ serializedMetadata.reset();
+ DataOutput output = serializedMetadata.getDataOutput();
+
+ int writersOffsetPointer = reserveInt(output);
+ int fieldNamesOffsetPointer = reserveInt(output);
+ int schemaOffsetPointer = reserveInt(output);
+ int metaSchemaOffsetPointer = reserveInt(output);
+ int pathInfoOffsetPointer = reserveInt(output);
+
+ //ColumnWriterInformation
+ setOffset(writersOffsetPointer);
+ output.writeInt(columnWriters.size());
+ for (IColumnValuesWriter writer : columnWriters) {
+ writer.serialize(output);
+ }
+
+ //FieldNames
+ setOffset(fieldNamesOffsetPointer);
+ fieldNamesDictionary.serialize(output);
+
+ //Schema
+ pathInfoSerializer.reset();
+ setOffset(schemaOffsetPointer);
+ root.serialize(output, pathInfoSerializer);
+ if (metaRoot != null) {
+ //Meta schema
+ setOffset(metaSchemaOffsetPointer);
+ metaRoot.serialize(output, pathInfoSerializer);
+ }
+
+ //Path info
+ setOffset(pathInfoOffsetPointer);
+ pathInfoSerializer.serialize(output, getNumberOfColumns());
+ }
+
+ private int reserveInt(DataOutput output) throws IOException {
+ int offset = serializedMetadata.getLength();
+ output.writeInt(-1);
+ return offset;
+ }
+
+ private void setOffset(int pointer) {
+ int offset = serializedMetadata.getLength();
+ IntegerPointable.setInteger(serializedMetadata.getByteArray(),
pointer, offset);
+ }
+
+ public static FlushColumnMetadata create(ARecordType datasetType,
ARecordType metaType,
+ List<List<String>> primaryKeys, List<Integer> keySourceIndicator,
+ IColumnValuesWriterFactory columnWriterFactory,
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+ IValueReference serializedMetadata) throws HyracksDataException {
+ boolean metaContainsKeys = metaType != null &&
keySourceIndicator.get(0) == 1;
+ try {
+ return createMutableMetadata(datasetType, metaType, primaryKeys,
metaContainsKeys, columnWriterFactory,
+ multiPageOpRef, serializedMetadata);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private static FlushColumnMetadata createMutableMetadata(ARecordType
datasetType, ARecordType metaType,
+ List<List<String>> primaryKeys, boolean metaContainsKeys,
IColumnValuesWriterFactory columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference
serializedMetadata) throws IOException {
+ DataInput input = new DataInputStream(new
ByteArrayInputStream(serializedMetadata.getByteArray(),
+ serializedMetadata.getStartOffset(),
serializedMetadata.getLength()));
+ //Skip offsets
+ input.skipBytes(OFFSETS_SIZE);
+
+ //ColumnWriter
+ List<IColumnValuesWriter> writers = new ArrayList<>();
+ deserializeWriters(input, writers, columnWriterFactory);
+
+ //FieldNames
+ FieldNamesDictionary fieldNamesDictionary =
FieldNamesDictionary.deserialize(input);
+
+ //Schema
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels =
new HashMap<>();
+ ObjectSchemaNode root = (ObjectSchemaNode)
AbstractSchemaNode.deserialize(input, definitionLevels);
+ ObjectSchemaNode metaRoot = null;
+ if (metaType != null) {
+ metaRoot = (ObjectSchemaNode)
AbstractSchemaNode.deserialize(input, definitionLevels);
+ }
+
+ ArrayBackedValueStorage schemaStorage = new
ArrayBackedValueStorage(serializedMetadata.getLength());
+ schemaStorage.append(serializedMetadata);
+ return new FlushColumnMetadata(datasetType, metaType, primaryKeys,
metaContainsKeys, columnWriterFactory,
+ multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot,
definitionLevels, schemaStorage);
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ DataInputStream input = new DataInputStream(new
ByteArrayInputStream(serializedMetadata.getByteArray()));
+ try {
+ abort(input);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void abort(DataInputStream input) throws IOException {
+ level = -1;
+ repeated = 0;
+ changed = false;
+
+ columnWriters.clear();
+ deserializeWriters(input, columnWriters, columnWriterFactory);
+
+ fieldNamesDictionary.abort(input);
+ definitionLevels.clear();
+ root.abort(input, definitionLevels);
+ }
+
+ public static void deserializeWriters(DataInput input,
List<IColumnValuesWriter> writers,
+ IColumnValuesWriterFactory columnWriterFactory) throws IOException
{
+ int numberOfWriters = input.readInt();
+ for (int i = 0; i < numberOfWriters; i++) {
+ writers.add(AbstractColumnValuesWriter.deserialize(input,
columnWriterFactory));
+ }
+ }
+
+ /* ********************************************************
+ * Column values related methods
+ * ********************************************************
+ */
+
+ /**
+ * Set {@link IColumnWriteMultiPageOp} for {@link IColumnValuesWriter}
+ *
+ * @param multiPageOp multi-buffer allocator
+ */
+ public void init(IColumnWriteMultiPageOp multiPageOp) throws
HyracksDataException {
+ multiPageOpRef.setValue(multiPageOp);
+
+ //Reset writer for the first write
+ for (int i = 0; i < columnWriters.size(); i++) {
+ columnWriters.get(i).reset();
+ }
+ }
+
+ public IColumnValuesWriter getWriter(int columnIndex) {
+ return columnWriters.get(columnIndex);
+ }
+
+ /* ********************************************************
+ * Schema related methods
+ * ********************************************************
+ */
+
+ public int getLevel() {
+ return level;
+ }
+
+ @Override
+ public int getNumberOfColumns() {
+ return columnWriters.size();
+ }
+
+ public AbstractSchemaNode getOrCreateChild(AbstractSchemaNode child,
ATypeTag childTypeTag)
+ throws HyracksDataException {
+ AbstractSchemaNode currentChild = child;
+ ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag);
+ if (currentChild == null || normalizedTypeTag != ATypeTag.MISSING &&
normalizedTypeTag != ATypeTag.NULL
+ && currentChild.getTypeTag() != ATypeTag.UNION &&
currentChild.getTypeTag() != normalizedTypeTag) {
+ //Create a new child or union type if required type is different
from the current child type
+ currentChild = createChild(child, normalizedTypeTag);
+ //Flag that the schema has changed
+ changed = true;
+ }
+ return currentChild;
+ }
+
+ public void enterLevel(AbstractSchemaNestedNode node) {
+ level++;
+ if (node.isCollection()) {
+ repeated++;
+ }
+ }
+
+ public void exitLevel(AbstractSchemaNestedNode node) {
+ level--;
+ if (node.isCollection()) {
+ repeated--;
+ }
+ }
+
+ public void enterNode(AbstractSchemaNestedNode parent, AbstractSchemaNode
node) throws HyracksDataException {
+ //Flush all definition levels from parent to child
+ flushDefinitionLevels(level, parent, node);
+ if (node.isObjectOrCollection()) {
+ //Enter one more level for object, array, and multiset
+ level++;
+ if (node.isCollection()) {
+ //Tells nested values that they are repeated
+ repeated++;
+ }
+ }
+ }
+
+ public void exitNode(AbstractSchemaNode node) {
+ if (node.isNested()) {
+ //Add the nested node's level for all missing children (i.e., not
entered for a record)
+ definitionLevels.get((AbstractSchemaNestedNode) node).add(level);
+ if (node.isObjectOrCollection()) {
+ //Union nodes should not change the level as they are logical
nodes
+ level--;
+ }
+ }
+ node.incrementCounter();
+ }
+
+ public void exitCollectionNode(AbstractCollectionSchemaNode
collectionNode, int numberOfItems) {
+ RunLengthIntArray collectionDefLevels =
definitionLevels.get(collectionNode);
+ //Add delimiter
+ collectionDefLevels.add(level - 1);
+ level--;
+ repeated--;
+ collectionNode.incrementCounter();
+ }
+
+ /**
+ * Needed by {@link AbstractCollectionSchemaNode} to add the definition
level for each item
+ *
+ * @param collectionSchemaNode collection node
+ * @return collection node's definition level
+ */
+ public RunLengthIntArray getDefinitionLevels(AbstractCollectionSchemaNode
collectionSchemaNode) {
+ return definitionLevels.get(collectionSchemaNode);
+ }
+
+ public void clearDefinitionLevels(AbstractSchemaNestedNode nestedNode) {
+ definitionLevels.get(nestedNode).reset();
+ }
+
+ public void flushDefinitionLevels(int level, AbstractSchemaNestedNode
parent, AbstractSchemaNode node)
+ throws HyracksDataException {
+ if (parent != null) {
+ RunLengthIntArray parentDefLevels = definitionLevels.get(parent);
+ if (node.getCounter() < parentDefLevels.getSize()) {
+ int parentMask = ColumnValuesUtil.getNullMask(level);
+ int childMask = ColumnValuesUtil.getNullMask(level + 1);
+ flushDefinitionLevels(parentMask, childMask, parentDefLevels,
node);
+ }
+ }
+ }
+
+ private void flushDefinitionLevels(int parentMask, int childMask,
RunLengthIntArray parentDefLevels,
+ AbstractSchemaNode node) throws HyracksDataException {
+ int startIndex = node.getCounter();
+ if (node.isNested()) {
+ RunLengthIntArray childDefLevels =
definitionLevels.get((AbstractSchemaNestedNode) node);
+ flushNestedDefinitionLevel(parentMask, childMask, startIndex,
parentDefLevels, childDefLevels);
+ } else {
+ IColumnValuesWriter writer =
columnWriters.get(((PrimitiveSchemaNode) node).getColumnIndex());
+ flushWriterDefinitionLevels(parentMask, childMask, startIndex,
parentDefLevels, writer);
+ }
+ node.setCounter(parentDefLevels.getSize());
+ }
+
+ private void flushNestedDefinitionLevel(int parentMask, int childMask, int
startIndex,
+ RunLengthIntArray parentDefLevels, RunLengthIntArray
childDefLevels) {
+ if (parentDefLevels.getSize() == 0) {
+ return;
+ }
+ //First, handle the first block as startIndex might be at the middle
of a block
+ //Get which block that startIndex resides
+ int blockIndex = parentDefLevels.getBlockIndex(startIndex);
+ //Get the remaining of the first block starting from startIndex
+ int remainingValues = parentDefLevels.getBlockSize(blockIndex,
startIndex);
+
+ int firstBlockValue =
+ ColumnValuesUtil.getChildValue(parentMask, childMask,
parentDefLevels.getBlockValue(blockIndex));
+ //Batch add all the remaining values
+ childDefLevels.add(firstBlockValue, remainingValues);
+
+ //Add other blocks as batches
+ for (int i = blockIndex + 1; i < parentDefLevels.getNumberOfBlocks();
i++) {
+ int blockValue = ColumnValuesUtil.getChildValue(parentMask,
childMask, parentDefLevels.getBlockValue(i));
+ childDefLevels.add(blockValue, parentDefLevels.getBlockSize(i));
+ }
+ }
+
+ private void flushWriterDefinitionLevels(int parentMask, int childMask,
int startIndex,
+ RunLengthIntArray parentDefLevels, IColumnValuesWriter writer)
throws HyracksDataException {
+ if (parentDefLevels.getSize() == 0) {
+ return;
+ }
+ /*
+ * We might need only a fraction of the first block. Hence, we first
determine how many definition level
+ * values we need. Then, we write those definition levels.
+ */
+ int blockIndex = parentDefLevels.getBlockIndex(startIndex);
+ int remainingValues = parentDefLevels.getBlockSize(blockIndex,
startIndex);
+ int firstBlockValue =
+ ColumnValuesUtil.getChildValue(parentMask, childMask,
parentDefLevels.getBlockValue(blockIndex));
+ writer.writeLevels(firstBlockValue, remainingValues);
+
+ //Write remaining definition levels from the remaining blocks
+ for (int i = blockIndex + 1; i < parentDefLevels.getNumberOfBlocks();
i++) {
+ int blockValue = ColumnValuesUtil.getChildValue(parentMask,
childMask, parentDefLevels.getBlockValue(i));
+ writer.writeLevels(blockValue, parentDefLevels.getBlockSize(i));
+ }
+ }
+
+ private AbstractSchemaNode createChild(AbstractSchemaNode child, ATypeTag
normalizedTypeTag)
+ throws HyracksDataException {
+ AbstractSchemaNode createdChild;
+ if (child != null) {
+ if (child.getTypeTag() == ATypeTag.NULL) {
+ //The previous child was a NULL. The new child needs to
inherit the NULL definition levels
+ int columnIndex = ((PrimitiveSchemaNode)
child).getColumnIndex();
+ RunLengthIntArray defLevels =
columnWriters.get(columnIndex).getDefinitionLevelsIntArray();
+ //Add the column index to be garbage collected
+ nullWriterIndexes.add(columnIndex);
+ createdChild = createChild(normalizedTypeTag);
+ int mask = ColumnValuesUtil.getNullMask(level);
+ flushDefinitionLevels(mask, mask, defLevels, createdChild);
+ } else {
+ //Different type. Make union
+ createdChild = addDefinitionLevelsAndGet(new
UnionSchemaNode(child, createChild(normalizedTypeTag)));
+ }
+ } else {
+ createdChild = createChild(normalizedTypeTag);
+ }
+ return createdChild;
+ }
+
+ private AbstractSchemaNode createChild(ATypeTag normalizedTypeTag) throws
HyracksDataException {
+ switch (normalizedTypeTag) {
+ case OBJECT:
+ return addDefinitionLevelsAndGet(new ObjectSchemaNode());
+ case ARRAY:
+ return addDefinitionLevelsAndGet(new ArraySchemaNode());
+ case MULTISET:
+ return addDefinitionLevelsAndGet(new MultisetSchemaNode());
+ case NULL:
+ case MISSING:
+ case BOOLEAN:
+ case DOUBLE:
+ case BIGINT:
+ case STRING:
+ case UUID:
+ int columnIndex = nullWriterIndexes.isEmpty() ?
columnWriters.size() : nullWriterIndexes.removeInt(0);
+ boolean primaryKey = columnIndex < getNumberOfPrimaryKeys();
+ boolean writeAlways = primaryKey || repeated > 0;
+ boolean filtered = !primaryKey;
+ int maxLevel = primaryKey ? 1 : level + 1;
+ IColumnValuesWriter writer =
columnWriterFactory.createValueWriter(normalizedTypeTag, columnIndex,
+ maxLevel, writeAlways, filtered);
+ if (multiPageOpRef.getValue() != null) {
+ writer.reset();
+ }
+ addColumn(columnIndex, writer);
+ return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag,
primaryKey);
+ default:
+ throw new IllegalStateException("Unsupported type " +
normalizedTypeTag);
+
+ }
+ }
+
+ private void addColumn(int index, IColumnValuesWriter writer) {
+ if (index == columnWriters.size()) {
+ columnWriters.add(writer);
+ } else {
+ columnWriters.set(index, writer);
+ }
+ }
+
+ private AbstractSchemaNode
addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) {
+ definitionLevels.put(nestedNode, new RunLengthIntArray());
+ return nestedNode;
+ }
+
+ public static ATypeTag getNormalizedTypeTag(ATypeTag typeTag) {
+ switch (typeTag) {
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ return ATypeTag.BIGINT;
+ case FLOAT:
+ return ATypeTag.DOUBLE;
+ default:
+ return typeTag;
+ }
+ }
+
+ public void close() {
+ //Dereference multiPageOp
+ multiPageOpRef.setValue(null);
+ for (int i = 0; i < columnWriters.size(); i++) {
+ columnWriters.get(i).close();
+ }
+ }
+
+ public void addNestedNull(AbstractSchemaNestedNode parent,
AbstractSchemaNestedNode node)
+ throws HyracksDataException {
+ //Flush all definition levels from parent to the current node
+ flushDefinitionLevels(level, parent, node);
+ //Add null value (+2) to say that both the parent and the child are
present
+ definitionLevels.get(node).add(ColumnValuesUtil.getNullMask(level + 2)
| level);
+ node.incrementCounter();
+ }
+}