Repository: asterixdb
Updated Branches:
  refs/heads/master 77109ea4b -> e856e1e8f


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
new file mode 100644
index 0000000..15f8a23
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+
+public class SecondaryCorrelatedInvertedIndexOperationsHelper extends 
SecondaryCorrelatedTreeIndexOperationsHelper {
+
+    private IAType secondaryKeyType;
+    private ITypeTraits[] invListsTypeTraits;
+    private IBinaryComparatorFactory[] tokenComparatorFactories;
+    private ITypeTraits[] tokenTypeTraits;
+    private IBinaryTokenizerFactory tokenizerFactory;
+    // For tokenization, sorting and loading. Represents <token, primary keys>.
+    private int numTokenKeyPairFields;
+    private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
+    private RecordDescriptor tokenKeyPairRecDesc;
+    private boolean isPartitioned;
+    private int[] invertedIndexFields;
+    private int[] invertedIndexFieldsForNonBulkLoadOps;
+    private int[] secondaryFilterFieldsForNonBulkLoadOps;
+
+    protected SecondaryCorrelatedInvertedIndexOperationsHelper(Dataset 
dataset, Index index,
+            PhysicalOptimizationConfig physOptConf, MetadataProvider 
metadataProvider) throws AlgebricksException {
+        super(dataset, index, physOptConf, metadataProvider);
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void setSecondaryRecDescAndComparators() throws 
AlgebricksException {
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        IndexType indexType = index.getIndexType();
+        boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
+        // Sanity checks.
+        if (numPrimaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX,
+                    indexType, 
RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), 
dataset.getDatasetName()));
+        }
+        if (numSecondaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, 
numSecondaryKeys,
+                    indexType, 1);
+        }
+        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            isPartitioned = true;
+        } else {
+            isPartitioned = false;
+        }
+        // Prepare record descriptor used in the assign op, and the optional
+        // select op.
+        secondaryFieldAccessEvalFactories = new 
IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys 
+ numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields = new 
ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + 
numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = 
FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = 
FormatUtils.getDefaultFormat().getTypeTraitProvider();
+        int recordColumn = NUM_TAG_FIELDS + numPrimaryKeys;
+        if (numSecondaryKeys > 0) {
+            secondaryFieldAccessEvalFactories[0] = 
FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory(
+                    isOverridingKeyFieldTypes ? enforcedItemType : itemType, 
index.getKeyFieldNames().get(0),
+                    recordColumn);
+            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                    index.getKeyFieldNames().get(0), itemType);
+            secondaryKeyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || 
keyTypePair.second;
+            ISerializerDeserializer keySerde = 
serdeProvider.getSerializerDeserializer(secondaryKeyType);
+            secondaryRecFields[0] = keySerde;
+            secondaryTypeTraits[0] = 
typeTraitProvider.getTypeTrait(secondaryKeyType);
+        }
+        if (numFilterFields > 0) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = 
FormatUtils.getDefaultFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, filterFieldName, 
recordColumn);
+            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = keyTypePair.first;
+            ISerializerDeserializer serde = 
serdeProvider.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        // Comparators and type traits for tokens.
+        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : 
numSecondaryKeys + 1;
+        tokenComparatorFactories = new 
IBinaryComparatorFactory[numTokenFields];
+        tokenTypeTraits = new ITypeTraits[numTokenFields];
+        tokenComparatorFactories[0] = 
NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        tokenTypeTraits[0] = 
NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+        if (isPartitioned) {
+            // The partitioning field is hardcoded to be a short *without* an 
Asterix type tag.
+            tokenComparatorFactories[1] = 
PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+        }
+        // Set tokenizer factory.
+        // TODO: We might want to expose the hashing option at the AQL level,
+        // and add the choice to the index metadata.
+        tokenizerFactory = 
NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), 
indexType,
+                index.getGramLength());
+        // Type traits for inverted-list elements. Inverted lists contain
+        // primary keys.
+        invListsTypeTraits = new ITypeTraits[numPrimaryKeys];
+        if (numPrimaryKeys > 0) {
+            invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+            enforcedRecFields[0] = primaryRecDesc.getFields()[0];
+            enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+        }
+        enforcedRecFields[numPrimaryKeys] = 
serdeProvider.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, 
enforcedTypeTraits);
+        // For tokenization, sorting and loading.
+        // One token (+ optional partitioning field) + primary keys.
+        numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + 
numPrimaryKeys;
+        ISerializerDeserializer[] tokenKeyPairFields =
+                new ISerializerDeserializer[numTokenKeyPairFields + 
numFilterFields];
+        ITypeTraits[] tokenKeyPairTypeTraits = new 
ITypeTraits[numTokenKeyPairFields];
+        tokenKeyPairComparatorFactories = new 
IBinaryComparatorFactory[numTokenKeyPairFields];
+        tokenKeyPairFields[0] = 
serdeProvider.getSerializerDeserializer(secondaryKeyType);
+        tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
+        tokenKeyPairComparatorFactories[0] = 
NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        int pkOff = 1;
+        if (isPartitioned) {
+            tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
+            tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
+            tokenKeyPairComparatorFactories[1] = 
PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            pkOff = 2;
+        }
+        if (numPrimaryKeys > 0) {
+            tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
+            tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0];
+            tokenKeyPairComparatorFactories[pkOff] = 
primaryComparatorFactories[0];
+        }
+        if (numFilterFields > 0) {
+            tokenKeyPairFields[numPrimaryKeys + pkOff] = 
secondaryRecFields[numPrimaryKeys + numSecondaryKeys];
+        }
+        tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, 
tokenKeyPairTypeTraits);
+        if (filterFieldName != null) {
+            invertedIndexFields = new int[numTokenKeyPairFields];
+            for (int i = 0; i < invertedIndexFields.length; i++) {
+                invertedIndexFields[i] = i;
+            }
+            secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields];
+            secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + 
numPrimaryKeys;
+            invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + 
numPrimaryKeys];
+            for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; 
i++) {
+                invertedIndexFieldsForNonBulkLoadOps[i] = i;
+            }
+        }
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numTokenKeyPairFields - numPrimaryKeys;
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
+        JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
+        JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+
+        // Create dummy key provider for feeding the primary index scan.
+        IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+
+        // Create primary index scan op.
+        IOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
+                
getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)),
 jobId);
+
+        IOperatorDescriptor sourceOp = primaryScanOp;
+        boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        if (isOverridingKeyFieldTypes && !enforcedItemType.equals(itemType)) {
+            sourceOp = createCastOp(spec, dataset.getDatasetType(), 
index.isEnforced());
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 
0, sourceOp, 0);
+        }
+
+        RecordDescriptor taggedSecondaryRecDesc = 
getTaggedRecordDescriptor(secondaryRecDesc);
+
+        RecordDescriptor taggedTokenKeyPairRecDesc = 
getTaggedRecordDescriptor(tokenKeyPairRecDesc);
+
+        AlgebricksMetaOperatorDescriptor asterixAssignOp =
+                createAssignOp(spec, numSecondaryKeys, taggedSecondaryRecDesc);
+
+        // Generate compensate tuples for upsert
+        IOperatorDescriptor processorOp =
+                createTupleProcessorOp(spec, taggedSecondaryRecDesc, 
numSecondaryKeys, numPrimaryKeys, true);
+
+        // Create a tokenizer op.
+        AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
+
+        // Create a sort op
+        ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                
getTaggedSecondaryComparatorFactories(tokenKeyPairComparatorFactories), 
taggedTokenKeyPairRecDesc);
+
+        // Create secondary inverted index bulk load op.
+        AbstractSingleActivityOperatorDescriptor invIndexBulkLoadOp =
+                createTreeIndexBulkLoadOp(spec, metadataProvider, 
taggedTokenKeyPairRecDesc,
+                        createFieldPermutationForBulkLoadOp(), 
getNumSecondaryKeys(), numPrimaryKeys, true);
+
+        AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new 
RecordDescriptor[] {});
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, 
primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 
0, processorOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), processorOp, 0, 
tokenizerOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, 
sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, 
invIndexBulkLoadOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), 
invIndexBulkLoadOp, 0, metaOp, 0);
+        spec.addRoot(metaOp);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private AbstractOperatorDescriptor createTokenizerOp(JobSpecification 
spec) throws AlgebricksException {
+        int docField = NUM_TAG_FIELDS;
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        int[] keyFields = new int[NUM_TAG_FIELDS + numPrimaryKeys + 
numFilterFields];
+        // set tag fields
+        for (int i = 0; i < NUM_TAG_FIELDS; i++) {
+            keyFields[i] = i;
+        }
+        // set primary key + filter fields
+        for (int i = NUM_TAG_FIELDS; i < keyFields.length; i++) {
+            keyFields[i] = i + numSecondaryKeys;
+        }
+        BinaryTokenizerOperatorDescriptor tokenizerOp = new 
BinaryTokenizerOperatorDescriptor(spec,
+                getTaggedRecordDescriptor(tokenKeyPairRecDesc), 
tokenizerFactory, docField, keyFields, isPartitioned,
+                false, true, MissingWriterFactory.INSTANCE);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
tokenizerOp,
+                primaryPartitionConstraint);
+        return tokenizerOp;
+    }
+
+    @Override
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification 
spec,
+            IBinaryComparatorFactory[] taggedSecondaryComparatorFactories, 
RecordDescriptor taggedSecondaryRecDesc) {
+        /**
+         * after tokenization, the field layout becomes
+         * [token, num?, tag, primary key, filter value]
+         * we need to sort on
+         * [tag, token, num?, primary key]
+         */
+        int[] taggedSortFields = new 
int[taggedSecondaryComparatorFactories.length];
+        int numSecondaryKeys = getNumSecondaryKeys();
+        int idx = 0;
+        // set component pos fields
+        taggedSortFields[idx++] = numSecondaryKeys;
+        // set secondary keys
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            taggedSortFields[idx++] = i;
+        }
+        // set primary keys
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            taggedSortFields[idx++] = i + numSecondaryKeys + NUM_TAG_FIELDS;
+        }
+        ExternalSortOperatorDescriptor sortOp =
+                new ExternalSortOperatorDescriptor(spec, 
physOptConf.getMaxFramesExternalSort(), taggedSortFields,
+                        taggedSecondaryComparatorFactories, 
taggedSecondaryRecDesc);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
+
+    @Override
+    protected int[] createFieldPermutationForBulkLoadOp() {
+        /**
+         * after tokenization, the field layout becomes
+         * [token, num?, tag, primary key, filter value]
+         * we need to restore it back to
+         * [tag, token, num?, primary key, filter value]
+         */
+        int[] fieldPermutation = new int[NUM_TAG_FIELDS + 
numTokenKeyPairFields + numFilterFields];
+        int numSecondaryKeys = getNumSecondaryKeys();
+        int idx = 0;
+        // set tag fields
+        for (int i = 0; i < NUM_TAG_FIELDS; i++) {
+            fieldPermutation[idx++] = i + numSecondaryKeys;
+        }
+        // set secondary keys
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            fieldPermutation[idx++] = i;
+        }
+        // set primary key + filter
+        for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) {
+            fieldPermutation[idx++] = i + NUM_TAG_FIELDS + numSecondaryKeys;
+        }
+        return fieldPermutation;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
new file mode 100644
index 0000000..8fd8a7a
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import 
org.apache.asterix.runtime.operators.LSMSecondaryIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryCorrelatedRTreeOperationsHelper extends 
SecondaryCorrelatedTreeIndexOperationsHelper {
+
+    protected IPrimitiveValueProviderFactory[] valueProviderFactories;
+    protected int numNestedSecondaryKeyFields;
+    protected ATypeTag keyType;
+    protected int[] primaryKeyFields;
+    protected int[] rtreeFields;
+    protected boolean isPointMBR;
+    protected RecordDescriptor secondaryRecDescForPointMBR = null;
+
+    protected SecondaryCorrelatedRTreeOperationsHelper(Dataset dataset, Index 
index,
+            PhysicalOptimizationConfig physOptConf, MetadataProvider 
metadataProvider) throws AlgebricksException {
+        super(dataset, index, physOptConf, metadataProvider);
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numNestedSecondaryKeyFields;
+    }
+
+    @Override
+    protected void setSecondaryRecDescAndComparators() throws 
AlgebricksException {
+        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
+        int numSecondaryKeys = secondaryKeyFields.size();
+        boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
+        if (numSecondaryKeys != 1) {
+            throw 
AsterixException.create(ErrorCode.INDEX_RTREE_MULTIPLE_FIELDS_NOT_ALLOWED, 
numSecondaryKeys);
+        }
+        Pair<IAType, Boolean> spatialTypePair =
+                
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), 
secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        anySecondaryKeyIsNullable = spatialTypePair.second;
+        isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || 
spatialType.getTypeTag() == ATypeTag.POINT3D;
+        int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        numNestedSecondaryKeyFields = numDimensions * 2;
+        int recordColumn = NUM_TAG_FIELDS + numPrimaryKeys;
+        secondaryFieldAccessEvalFactories =
+                
metadataProvider.getFormat().createMBRFactory(isOverridingKeyFieldTypes ? 
enforcedItemType : itemType,
+                        secondaryKeyFields.get(0), recordColumn, 
numDimensions, filterFieldName, isPointMBR);
+        secondaryComparatorFactories = new 
IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        valueProviderFactories = new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + 
numNestedSecondaryKeyFields + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields = new 
ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + 
numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde =
+                    
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] =
+                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType,
 true);
+            secondaryTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] =
+                    
metadataProvider.getStorageComponentProvider().getPrimitiveValueProviderFactory();
+
+        }
+        // Add serializers and comparators for primary index fields.
+        // only support internal datasets
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numNestedSecondaryKeyFields + i] = 
primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
primaryRecDesc.getTypeTraits()[i];
+            enforcedRecFields[i] = primaryRecDesc.getFields()[i];
+            enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
+        }
+
+        enforcedRecFields[numPrimaryKeys] = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, 
enforcedTypeTraits);
+        if (numFilterFields > 0) {
+            rtreeFields = new int[numNestedSecondaryKeyFields + 
numPrimaryKeys];
+            for (int i = 0; i < rtreeFields.length; i++) {
+                rtreeFields[i] = i;
+            }
+
+            Pair<IAType, Boolean> typePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = typePair.first;
+            ISerializerDeserializer serde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = 
serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        primaryKeyFields = new int[numPrimaryKeys];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
+        }
+        if (isPointMBR) {
+            int numNestedSecondaryKeyFieldForPointMBR = 
numNestedSecondaryKeyFields / 2;
+            ISerializerDeserializer[] recFieldsForPointMBR = new 
ISerializerDeserializer[numPrimaryKeys
+                    + numNestedSecondaryKeyFieldForPointMBR + numFilterFields];
+            int idx = 0;
+            for (int i = 0; i < numNestedSecondaryKeyFieldForPointMBR; i++) {
+                recFieldsForPointMBR[idx++] = secondaryRecFields[i];
+            }
+            for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) {
+                recFieldsForPointMBR[idx++] = 
secondaryRecFields[numNestedSecondaryKeyFields + i];
+            }
+            secondaryRecDescForPointMBR = new 
RecordDescriptor(recFieldsForPointMBR);
+        }
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AsterixException, 
AlgebricksException {
+        /***************************************************
+         * [ About PointMBR Optimization ]
+         * Instead of storing a MBR(4 doubles) for a point(2 doubles) in RTree 
leaf node,
+         * PointMBR concept is introduced.
+         * PointMBR is a way to store a point as 2 doubles in RTree leaf node.
+         * This reduces RTree index size roughly in half.
+         * In order to fully benefit from the PointMBR concept, besides RTree,
+         * external sort operator during bulk-loading (from either data 
loading or index creation)
+         * must deal with point as 2 doubles instead of 4 doubles. Otherwise, 
external sort will suffer from twice as
+         * many doubles as it actually requires. For this purpose,
+         * PointMBR specific optimization logic is added as follows:
+         * 1) CreateMBR function in assign operator generates 2 doubles, 
instead of 4 doubles.
+         * 2) External sort operator sorts points represented with 2 doubles.
+         * 3) Bulk-loading in RTree takes 4 doubles by reading 2 doubles twice 
and then,
+         * do the same work as non-point MBR cases.
+         ***************************************************/
+        JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
+        int numNestedSecondaryKeFieldsConsideringPointMBR =
+                isPointMBR ? numNestedSecondaryKeyFields / 2 : 
numNestedSecondaryKeyFields;
+        RecordDescriptor secondaryRecDescConsideringPointMBR = isPointMBR
+                ? getTaggedRecordDescriptor(secondaryRecDescForPointMBR) : 
getTaggedRecordDescriptor(secondaryRecDesc);
+
+        boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
+
+        assert dataset.getDatasetType() == DatasetType.INTERNAL;
+
+        // Create dummy key provider for feeding the primary index scan.
+        IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+        JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+
+        // Create primary index scan op.
+        IOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
+                
getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)),
 jobId);
+
+        // Assign op.
+        IOperatorDescriptor sourceOp = primaryScanOp;
+        if (isOverridingKeyFieldTypes && !enforcedItemType.equals(itemType)) {
+            sourceOp = createCastOp(spec, dataset.getDatasetType(), 
index.isEnforced());
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 
0, sourceOp, 0);
+        }
+
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec,
+                numNestedSecondaryKeFieldsConsideringPointMBR, 
secondaryRecDescConsideringPointMBR);
+
+        // Generate compensate tuples for upsert
+        IOperatorDescriptor processorOp = createTupleProcessorOp(spec, 
secondaryRecDescConsideringPointMBR,
+                numNestedSecondaryKeFieldsConsideringPointMBR, numPrimaryKeys, 
false);
+
+        ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                getTaggedSecondaryComparatorFactories(new 
IBinaryComparatorFactory[] {
+                        MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length) }),
+                secondaryRecDescConsideringPointMBR);
+
+        // Create secondary RTree bulk load op.
+        LSMSecondaryIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = 
createTreeIndexBulkLoadOp(spec,
+                metadataProvider, secondaryRecDescConsideringPointMBR, 
createFieldPermutationForBulkLoadOp(),
+                numNestedSecondaryKeFieldsConsideringPointMBR, numPrimaryKeys, 
false);
+
+        AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new 
RecordDescriptor[] {});
+
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, 
primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 
0, processorOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), processorOp, 0, 
sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, 
secondaryBulkLoadOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), 
secondaryBulkLoadOp, 0, metaOp, 0);
+        spec.addRoot(metaOp);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+
+        return spec;
+    }
+
+    @Override
+    protected int[] createFieldPermutationForBulkLoadOp() {
+        if (isPointMBR) {
+            int[] fieldPermutation =
+                    new int[NUM_TAG_FIELDS + numNestedSecondaryKeyFields + 
numPrimaryKeys + numFilterFields];
+            int idx = 0;
+            int numSecondaryKeyFieldsForPointMBR = numNestedSecondaryKeyFields 
/ 2;
+            for (int i = 0; i < NUM_TAG_FIELDS + 
numSecondaryKeyFieldsForPointMBR; i++) {
+                fieldPermutation[idx++] = i;
+            }
+            //add the rest of the sk fields for pointMBR
+            for (int i = 0; i < numSecondaryKeyFieldsForPointMBR; i++) {
+                fieldPermutation[idx++] = NUM_TAG_FIELDS + i;
+            }
+            //add the pk and filter fields
+            int end = numSecondaryKeyFieldsForPointMBR + numPrimaryKeys + 
numFilterFields;
+            for (int i = numSecondaryKeyFieldsForPointMBR; i < end; i++) {
+                fieldPermutation[idx++] = NUM_TAG_FIELDS + i;
+            }
+            return fieldPermutation;
+        } else {
+            return super.createFieldPermutationForBulkLoadOp();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
index fa33790..171d72e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
@@ -263,8 +263,8 @@ public abstract class 
SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
     }
 
     protected LSMSecondaryIndexBulkLoadOperatorDescriptor 
createTreeIndexBulkLoadOp(JobSpecification spec,
-            MetadataProvider metadataProvider, RecordDescriptor 
taggedSecondaryRecDesc, int numSecondaryKeys,
-            int numPrimaryKeys, boolean hasBuddyBtree) throws 
AlgebricksException {
+            MetadataProvider metadataProvider, RecordDescriptor 
taggedSecondaryRecDesc, int[] fieldPermutation,
+            int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBtree) 
throws AlgebricksException {
         IndexDataflowHelperFactory primaryIndexHelperFactory = new 
IndexDataflowHelperFactory(
                 
metadataProvider.getStorageComponentProvider().getStorageManager(), 
primaryFileSplitProvider);
 
@@ -273,7 +273,8 @@ public abstract class 
SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
 
         LSMSecondaryIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp =
                 new LSMSecondaryIndexBulkLoadOperatorDescriptor(spec, 
taggedSecondaryRecDesc, primaryIndexHelperFactory,
-                        secondaryIndexHelperFactory, NUM_TAG_FIELDS, 
numSecondaryKeys, numPrimaryKeys, hasBuddyBtree);
+                        secondaryIndexHelperFactory, fieldPermutation, 
NUM_TAG_FIELDS, numSecondaryKeys,
+                        numPrimaryKeys, hasBuddyBtree);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return treeIndexBulkLoadOp;
@@ -306,14 +307,16 @@ public abstract class 
SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
                         new SecondaryCorrelatedBTreeOperationsHelper(dataset, 
index, physOptConf, metadataProvider);
                 break;
             case RTREE:
-                //TODO RTree
+                indexOperationsHelper =
+                        new SecondaryCorrelatedRTreeOperationsHelper(dataset, 
index, physOptConf, metadataProvider);
+                break;
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case LENGTH_PARTITIONED_NGRAM_INVIX:
-                //TODO Inverted Index
-                //TODO This will be fixed soon
-                throw new UnsupportedOperationException();
+                indexOperationsHelper = new 
SecondaryCorrelatedInvertedIndexOperationsHelper(dataset, index,
+                        physOptConf, metadataProvider);
+                break;
             default:
                 throw new 
CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, 
index.getIndexType());
         }
@@ -321,4 +324,12 @@ public abstract class 
SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
         return indexOperationsHelper;
     }
 
+    protected int[] createFieldPermutationForBulkLoadOp() {
+        int[] fieldPermutation = new int[NUM_TAG_FIELDS + 
getNumSecondaryKeys() + numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            fieldPermutation[i] = i;
+        }
+        return fieldPermutation;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index cb15b98..b4d8a22 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -23,6 +23,7 @@ import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -213,8 +214,8 @@ public class SecondaryInvertedIndexOperationsHelper extends 
SecondaryTreeIndexOp
         IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
-        IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
-                jobId);
+        IOperatorDescriptor primaryScanOp =
+                DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, 
dataset, jobId);
 
         IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
@@ -270,8 +271,9 @@ public class SecondaryInvertedIndexOperationsHelper extends 
SecondaryTreeIndexOp
         for (int i = 0; i < primaryKeyFields.length; i++) {
             primaryKeyFields[i] = numSecondaryKeys + i;
         }
-        BinaryTokenizerOperatorDescriptor tokenizerOp = new 
BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc,
-                tokenizerFactory, docField, primaryKeyFields, isPartitioned, 
false);
+        BinaryTokenizerOperatorDescriptor tokenizerOp =
+                new BinaryTokenizerOperatorDescriptor(spec, 
tokenKeyPairRecDesc, tokenizerFactory, docField,
+                        primaryKeyFields, isPartitioned, false, false, 
MissingWriterFactory.INSTANCE);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
tokenizerOp,
                 primaryPartitionConstraint);
         return tokenizerOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
index 6b050af..0d71f71 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
@@ -24,7 +24,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
@@ -33,9 +32,6 @@ public abstract class 
AbstractLSMSecondaryIndexCreationNodePushable
     protected final IHyracksTaskContext ctx;
     protected final RecordDescriptor inputRecDesc;
 
-    // with tag fields
-    protected final FrameTupleReference tuple;
-
     protected final int partition;
     protected final int numTagFields;
     protected final int numSecondaryKeys;
@@ -50,7 +46,6 @@ public abstract class 
AbstractLSMSecondaryIndexCreationNodePushable
             boolean hasBuddyBTree) {
         this.ctx = ctx;
         this.inputRecDesc = inputRecDesc;
-        this.tuple = new FrameTupleReference();
 
         this.partition = partition;
         this.numTagFields = numTagFields;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index f09bcd2..2eb84fb 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
@@ -45,7 +46,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
  */
 public class LSMSecondaryIndexBulkLoadNodePushable extends 
AbstractLSMSecondaryIndexCreationNodePushable {
     // with tag fields
-
+    private final PermutingFrameTupleReference tuple;
     private final PermutingTupleReference sourceTuple;
     private final PermutingTupleReference deletedKeyTuple;
 
@@ -63,16 +64,17 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends 
AbstractLSMSecondaryI
 
     public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int 
partition, RecordDescriptor inputRecDesc,
             IIndexDataflowHelperFactory primaryIndexHelperFactory,
-            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int 
numTagFields, int numSecondaryKeys,
-            int numPrimaryKeys, boolean hasBuddyBTree) throws 
HyracksDataException {
+            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] 
fieldPermutation, int numTagFields,
+            int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) 
throws HyracksDataException {
         super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, 
numPrimaryKeys, hasBuddyBTree);
 
         this.primaryIndexHelper =
                 
primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
         this.secondaryIndexHelper =
                 
secondaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
+        this.tuple = new PermutingFrameTupleReference(fieldPermutation);
 
-        int[] sourcePermutation = new int[inputRecDesc.getFieldCount() - 
numTagFields];
+        int[] sourcePermutation = new int[fieldPermutation.length - 
numTagFields];
         for (int i = 0; i < sourcePermutation.length; i++) {
             sourcePermutation[i] = i + numTagFields;
         }
@@ -170,7 +172,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends 
AbstractLSMSecondaryI
                     loadNewComponent(componentPos);
                     currentComponentPos = componentPos;
                 }
-
                 if (isAntiMatterTuple(tuple)) {
                     addAntiMatterTuple(tuple);
                 } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
index 804b700..4f1dfd7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
@@ -35,6 +35,7 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor 
extends AbstractSingleA
     private final IIndexDataflowHelperFactory primaryIndexHelperFactory;
     private final IIndexDataflowHelperFactory secondaryIndexHelperFactory;
 
+    private final int[] fieldPermutation;
     private final int numTagFields;
     private final int numSecondaryKeys;
     private final int numPrimaryKeys;
@@ -43,12 +44,13 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor 
extends AbstractSingleA
 
     public 
LSMSecondaryIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
             IIndexDataflowHelperFactory primaryIndexHelperFactory,
-            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int 
numTagFields, int numSecondaryKeys,
-            int numPrimaryKeys, boolean hasBuddyBTree) {
+            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] 
fieldPermutation, int numTagFields,
+            int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) {
         super(spec, 1, 1);
         this.outRecDescs[0] = outRecDesc;
         this.primaryIndexHelperFactory = primaryIndexHelperFactory;
         this.secondaryIndexHelperFactory = secondaryIndexHelperFactory;
+        this.fieldPermutation = fieldPermutation;
         this.numTagFields = numTagFields;
         this.numSecondaryKeys = numSecondaryKeys;
         this.numPrimaryKeys = numPrimaryKeys;
@@ -61,6 +63,7 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor 
extends AbstractSingleA
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
         return new LSMSecondaryIndexBulkLoadNodePushable(ctx, partition,
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), primaryIndexHelperFactory,
-                secondaryIndexHelperFactory, numTagFields, numSecondaryKeys, 
numPrimaryKeys, hasBuddyBTree);
+                secondaryIndexHelperFactory, fieldPermutation, numTagFields, 
numSecondaryKeys, numPrimaryKeys,
+                hasBuddyBTree);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
index b2a2fa4..9376d1b 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
@@ -36,6 +36,7 @@ import 
org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -69,6 +70,7 @@ import 
org.apache.hyracks.dataflow.std.base.AbstractStateObject;
  * [component pos, anti-matter flag, secondary keys, primary keys, filter 
values]
  */
 public class LSMSecondaryIndexCreationTupleProcessorNodePushable extends 
AbstractLSMSecondaryIndexCreationNodePushable {
+    private final FrameTupleReference tuple = new FrameTupleReference();
     // prevSourceTuple stores the previous matter tuple
     private final ArrayTupleBuilder prevMatterTupleBuilder;
     private final ArrayTupleReference prevMatterTuple = new 
ArrayTupleReference();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index fe9decd..3ed47e6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -21,6 +21,7 @@ package 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -47,8 +48,13 @@ public class BinaryTokenizerOperatorDescriptor extends 
AbstractSingleActivityOpe
     // False: [token, number of token(if a partitioned index), keyfield1, 
keyfield2 ...]
     private final boolean writeKeyFieldsFirst;
 
+    private final boolean writeMissing;
+
+    private final IMissingWriterFactory missingWriterFactory;
+
     public BinaryTokenizerOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor recDesc,
-            IBinaryTokenizerFactory tokenizerFactory, int docField, int[] 
keyFields, boolean addNumTokensKey, boolean writeKeyFieldsFirst) {
+            IBinaryTokenizerFactory tokenizerFactory, int docField, int[] 
keyFields, boolean addNumTokensKey,
+            boolean writeKeyFieldsFirst, boolean writeMissing, 
IMissingWriterFactory missingWriterFactory) {
         super(spec, 1, 1);
         this.tokenizerFactory = tokenizerFactory;
         this.docField = docField;
@@ -56,13 +62,16 @@ public class BinaryTokenizerOperatorDescriptor extends 
AbstractSingleActivityOpe
         this.addNumTokensKey = addNumTokensKey;
         outRecDescs[0] = recDesc;
         this.writeKeyFieldsFirst = writeKeyFieldsFirst;
+        this.writeMissing = writeMissing;
+        this.missingWriterFactory = missingWriterFactory;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
-        return new BinaryTokenizerOperatorNodePushable(ctx, 
recordDescProvider.getInputRecordDescriptor(
-                getActivityId(), 0), outRecDescs[0], 
tokenizerFactory.createTokenizer(), docField, keyFields,
-                addNumTokensKey, writeKeyFieldsFirst);
+        return new BinaryTokenizerOperatorNodePushable(ctx,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), outRecDescs[0],
+                tokenizerFactory.createTokenizer(), docField, keyFields, 
addNumTokensKey, writeKeyFieldsFirst,
+                writeMissing, missingWriterFactory);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 002457b..3df185a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -24,6 +24,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriter;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.GrowableArray;
@@ -31,6 +33,8 @@ import 
org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
@@ -45,6 +49,9 @@ public class BinaryTokenizerOperatorNodePushable extends 
AbstractUnaryInputUnary
     private final boolean writeKeyFieldsFirst;
     private final RecordDescriptor inputRecDesc;
     private final RecordDescriptor outputRecDesc;
+    private final boolean writeMissing;
+    private final IMissingWriter missingWriter;
+    private final FrameTupleReference tuple = new FrameTupleReference();
 
     private FrameTupleAccessor accessor;
     private ArrayTupleBuilder builder;
@@ -53,7 +60,8 @@ public class BinaryTokenizerOperatorNodePushable extends 
AbstractUnaryInputUnary
 
     public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx, 
RecordDescriptor inputRecDesc,
             RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int 
docField, int[] keyFields,
-            boolean addNumTokensKey, boolean writeKeyFieldsFirst) {
+            boolean addNumTokensKey, boolean writeKeyFieldsFirst, boolean 
writeMissing,
+            IMissingWriterFactory missingWriterFactory) {
         this.ctx = ctx;
         this.tokenizer = tokenizer;
         this.docField = docField;
@@ -62,6 +70,8 @@ public class BinaryTokenizerOperatorNodePushable extends 
AbstractUnaryInputUnary
         this.inputRecDesc = inputRecDesc;
         this.outputRecDesc = outputRecDesc;
         this.writeKeyFieldsFirst = writeKeyFieldsFirst;
+        this.writeMissing = writeMissing;
+        this.missingWriter = missingWriterFactory.createMissingWriter();
     }
 
     @Override
@@ -79,76 +89,85 @@ public class BinaryTokenizerOperatorNodePushable extends 
AbstractUnaryInputUnary
         int tupleCount = accessor.getTupleCount();
 
         for (int i = 0; i < tupleCount; i++) {
-            short numTokens = 0;
+            tuple.reset(accessor, i);
 
-            tokenizer.reset(accessor.getBuffer().array(), 
accessor.getTupleStartOffset(i)
-                    + accessor.getFieldSlotsLength() + 
accessor.getFieldStartOffset(i, docField),
-                    accessor.getFieldLength(i, docField));
+            short numTokens = 0;
 
-            if (addNumTokensKey) {
-                // Get the total number of tokens.
-                numTokens = tokenizer.getTokensCount();
+            if (!isDocFieldMissing(tuple)) {
+                tokenizer.reset(tuple.getFieldData(docField), 
tuple.getFieldStart(docField),
+                        tuple.getFieldLength(docField));
+                if (addNumTokensKey) {
+                    // Get the total number of tokens.
+                    numTokens = tokenizer.getTokensCount();
+                }
+                // Write token and data into frame by following the order 
specified
+                // in the writeKeyFieldsFirst field.
+                while (tokenizer.hasNext()) {
+                    tokenizer.next();
+                    IToken token = tokenizer.getToken();
+                    writeTuple(token, numTokens, i);
+                }
+            } else if (writeMissing) {
+                writeTuple(null, 0, i);
             }
+        }
 
-            // Write token and data into frame by following the order specified
-            // in the writeKeyFieldsFirst field.
-            while (tokenizer.hasNext()) {
-
-                tokenizer.next();
-
-                builder.reset();
-
-                // Writing Order: token, number of token, keyfield1 ... n
-                if (!writeKeyFieldsFirst) {
-                    try {
-                        IToken token = tokenizer.getToken();
-                        token.serializeToken(builderData);
-
-                        builder.addFieldEndOffset();
-                        // Add number of tokens if requested.
-                        if (addNumTokensKey) {
-                            builder.getDataOutput().writeShort(numTokens);
-                            builder.addFieldEndOffset();
-                        }
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e.getMessage());
-                    }
-
-                    for (int k = 0; k < keyFields.length; k++) {
-                        builder.addField(accessor, i, keyFields[k]);
-                    }
+    }
 
+    private void writeTuple(IToken token, int numTokens, int fieldIdx) throws 
HyracksDataException {
+        builder.reset();
+
+        // Writing Order: token, number of token, keyfield1 ... n
+        if (!writeKeyFieldsFirst) {
+            try {
+                if (token != null) {
+                    token.serializeToken(builderData);
+                    builder.addFieldEndOffset();
+                } else {
+                    missingWriter.writeMissing(builder.getDataOutput());
+                    builder.addFieldEndOffset();
                 }
-                // Writing Order: keyfield1 ... n, token, number of token
-                else {
-
-                    for (int k = 0; k < keyFields.length; k++) {
-                        builder.addField(accessor, i, keyFields[k]);
-                    }
-
-                    try {
-                        IToken token = tokenizer.getToken();
-                        token.serializeToken(builderData);
-
-                        builder.addFieldEndOffset();
-                        // Add number of tokens if requested.
-                        if (addNumTokensKey) {
-                            builder.getDataOutput().writeShort(numTokens);
-                            builder.addFieldEndOffset();
-                        }
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e.getMessage());
-                    }
 
+                // Add number of tokens if requested.
+                if (addNumTokensKey) {
+                    builder.getDataOutput().writeShort(numTokens);
+                    builder.addFieldEndOffset();
                 }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
 
-                FrameUtils.appendToWriter(writer, appender, 
builder.getFieldEndOffsets(), builder.getByteArray(), 0,
-                        builder.getSize());
+            for (int k = 0; k < keyFields.length; k++) {
+                builder.addField(accessor, fieldIdx, keyFields[k]);
+            }
 
+        }
+        // Writing Order: keyfield1 ... n, token, number of token
+        else {
+            for (int k = 0; k < keyFields.length; k++) {
+                builder.addField(accessor, fieldIdx, keyFields[k]);
             }
 
+            try {
+                if (token != null) {
+                    token.serializeToken(builderData);
+                    builder.addFieldEndOffset();
+                } else {
+                    missingWriter.writeMissing(builder.getDataOutput());
+                    builder.addFieldEndOffset();
+                }
+                // Add number of tokens if requested.
+                if (addNumTokensKey) {
+                    builder.getDataOutput().writeShort(numTokens);
+                    builder.addFieldEndOffset();
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
         }
 
+        FrameUtils.appendToWriter(writer, appender, 
builder.getFieldEndOffsets(), builder.getByteArray(), 0,
+                builder.getSize());
     }
 
     @Override
@@ -169,4 +188,14 @@ public class BinaryTokenizerOperatorNodePushable extends 
AbstractUnaryInputUnary
     public void flush() throws HyracksDataException {
         appender.flush(writer);
     }
+
+    /**
+     * Returns whether the doc field is missing (only with a type tag)
+     *
+     * @param tuple
+     * @return
+     */
+    private boolean isDocFieldMissing(ITupleReference tuple) {
+        return tuple.getFieldLength(docField) <= 1;
+    }
 }

Reply via email to