Improve reading from and writing to Metadata pages This change introduces a new interface IComponentMetadata. Each LSM component is associated with a metadata object which can be used to read and write arbitrary data to the metadata pages of components. When flushing a component, data in its metadata component is automatically flushed to the disk component. For merge operations, the IO Callback is responsible for merging the components' metadata pages.
Change-Id: Id95ef33c0a0bc1abb3fc3ecdea5611ee4acd6dfa Reviewed-on: https://asterix-gerrit.ics.uci.edu/1476 Tested-by: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> Integration-Tests: Ian Maxon <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/d718dc4a Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/d718dc4a Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/d718dc4a Branch: refs/heads/master Commit: d718dc4a7fbfea5b50850f82f3f4abc01bc0e841 Parents: a8d961d Author: Abdullah Alamoudi <[email protected]> Authored: Thu Feb 2 21:41:48 2017 -0800 Committer: abdullah alamoudi <[email protected]> Committed: Fri Feb 3 07:56:03 2017 -0800 ---------------------------------------------------------------------- .../api/http/servlet/ConnectorApiLetTest.java | 180 ---------------- .../http/servlet/ConnectorApiServletTest.java | 180 ++++++++++++++++ .../api/http/servlet/VersionApiLetTest.java | 120 ----------- .../api/http/servlet/VersionApiServletTest.java | 120 +++++++++++ .../asterix/test/dataflow/LogMarkerTest.java | 10 +- .../context/CorrelatedPrefixMergePolicy.java | 22 +- .../context/PrimaryIndexOperationTracker.java | 4 +- .../asterix/common/dataflow/LSMIndexUtil.java | 13 +- .../common/exceptions/RuntimeDataException.java | 1 + .../AbstractLSMIOOperationCallback.java | 18 +- .../LSMBTreeIOOperationCallback.java | 25 ++- .../LSMBTreeIOOperationCallbackFactory.java | 2 +- .../LSMBTreeWithBuddyIOOperationCallback.java | 15 +- ...TreeWithBuddyIOOperationCallbackFactory.java | 2 +- .../LSMInvertedIndexIOOperationCallback.java | 18 +- ...InvertedIndexIOOperationCallbackFactory.java | 2 +- .../LSMRTreeIOOperationCallback.java | 15 +- .../LSMRTreeIOOperationCallbackFactory.java | 2 +- .../PrimaryIndexLogMarkerCallback.java | 83 ++++++- .../indexing/ExternalFileIndexAccessor.java | 26 +-- asterixdb/asterix-lang-sqlpp/pom.xml | 18 +- .../pom.xml | 10 +- .../metadata/bootstrap/MetadataBootstrap.java | 8 +- .../management/ReplicationManager.java | 50 ++--- .../ExternalBTreeLocalResourceMetadata.java | 15 +- ...rnalBTreeWithBuddyLocalResourceMetadata.java | 20 +- .../ExternalRTreeLocalResourceMetadata.java | 11 +- .../resource/LSMBTreeLocalResourceMetadata.java | 22 +- .../LSMInvertedIndexLocalResourceMetadata.java | 4 +- .../resource/LSMRTreeLocalResourceMetadata.java | 4 +- .../hyracks/algebricks/common/utils/Pair.java | 12 +- .../hyracks/algebricks/common/utils/Triple.java | 11 +- .../physical/AbstractStableSortPOperator.java | 1 - .../physical/OneToOneExchangePOperator.java | 2 +- .../physical/RandomMergeExchangePOperator.java | 2 +- .../algebricks/algebricks-rewriter/pom.xml | 7 +- .../hyracks/api/exceptions/ErrorCode.java | 1 + .../src/main/resources/errormsg/en.properties | 21 +- .../hyracks/data/std/api/IValueReference.java | 6 +- .../data/std/primitive/LongPointable.java | 21 +- .../data/std/util/ArrayBackedValueStorage.java | 20 ++ .../hyracks/data/std/util/GrowableArray.java | 29 +++ .../tests/integration/JobFailureTest.java | 9 +- ...onOnCreatePushRuntimeOperatorDescriptor.java | 5 + .../http/server/ChunkedNettyOutputStream.java | 17 +- .../am/btree/dataflow/BTreeDataflowHelper.java | 18 +- .../storage/am/btree/util/BTreeUtils.java | 10 +- .../AppendOnlyLinkedMetadataPageManager.java | 17 +- .../dataflow/ExternalBTreeDataflowHelper.java | 2 +- .../ExternalBTreeWithBuddyDataflowHelper.java | 2 +- .../btree/dataflow/LSMBTreeDataflowHelper.java | 2 +- .../am/lsm/btree/impls/ExternalBTree.java | 104 ++++----- .../lsm/btree/impls/ExternalBTreeOpContext.java | 15 +- .../lsm/btree/impls/ExternalBTreeWithBuddy.java | 118 +++++----- .../impls/ExternalBTreeWithBuddyOpContext.java | 15 +- .../storage/am/lsm/btree/impls/LSMBTree.java | 166 +++++++------- .../lsm/btree/impls/LSMBTreeDiskComponent.java | 12 +- .../impls/LSMBTreeDiskComponentFactory.java | 17 +- .../lsm/btree/impls/LSMBTreeFlushOperation.java | 18 +- .../btree/impls/LSMBTreeMemoryComponent.java | 16 +- .../lsm/btree/impls/LSMBTreeMergeOperation.java | 10 +- .../am/lsm/btree/impls/LSMBTreeOpContext.java | 22 +- .../impls/LSMBTreeWithBuddyDiskComponent.java | 12 +- .../LSMBTreeWithBuddyDiskComponentFactory.java | 34 +-- .../impls/LSMBTreeWithBuddyMemoryComponent.java | 14 +- .../impls/LSMBTreeWithBuddyMergeOperation.java | 24 +-- .../hyracks-storage-am-lsm-common/pom.xml | 4 + .../am/lsm/common/api/IComponentMetadata.java | 54 +++++ .../am/lsm/common/api/ILSMComponent.java | 87 +++++++- .../am/lsm/common/api/ILSMComponentFactory.java | 31 --- .../am/lsm/common/api/ILSMComponentFilter.java | 13 +- .../common/api/ILSMComponentFilterFactory.java | 3 +- .../api/ILSMComponentFilterFrameFactory.java | 3 +- .../common/api/ILSMComponentFilterManager.java | 8 +- .../am/lsm/common/api/ILSMDiskComponent.java | 49 +++++ .../common/api/ILSMDiskComponentFactory.java | 36 ++++ .../storage/am/lsm/common/api/ILSMHarness.java | 30 ++- .../am/lsm/common/api/ILSMIOOperation.java | 15 +- .../lsm/common/api/ILSMIOOperationCallback.java | 13 +- .../api/ILSMIOOperationCallbackFactory.java | 2 +- .../api/ILSMIOOperationCallbackProvider.java | 3 +- .../storage/am/lsm/common/api/ILSMIndex.java | 92 +++++++- .../am/lsm/common/api/ILSMIndexAccessor.java | 41 ++-- .../common/api/ILSMIndexAccessorInternal.java | 42 ---- .../am/lsm/common/api/ILSMIndexInternal.java | 98 --------- .../common/api/ILSMIndexOperationContext.java | 20 +- .../am/lsm/common/api/ILSMMemoryComponent.java | 77 +++++++ .../am/lsm/common/api/ILSMMergePolicy.java | 7 +- .../lsm/common/api/ILSMMergePolicyFactory.java | 8 +- .../am/lsm/common/api/ILSMOperationTracker.java | 8 +- .../storage/am/lsm/common/api/ITwoPCIndex.java | 10 +- .../am/lsm/common/api/IVirtualBufferCache.java | 8 +- .../common/api/IVirtualBufferCacheProvider.java | 5 +- .../common/impls/AbstractDiskLSMComponent.java | 109 ---------- .../lsm/common/impls/AbstractLSMComponent.java | 33 +-- .../common/impls/AbstractLSMDiskComponent.java | 99 +++++++++ .../am/lsm/common/impls/AbstractLSMIndex.java | 66 +++--- .../impls/AbstractLSMMemoryComponent.java | 210 ++++++++++++++++++ .../impls/AbstractMemoryLSMComponent.java | 214 ------------------- .../am/lsm/common/impls/BTreeFactory.java | 3 +- .../BlockingIOOperationCallbackWrapper.java | 8 +- .../lsm/common/impls/ConstantMergePolicy.java | 21 +- .../lsm/common/impls/DiskComponentMetadata.java | 56 +++++ .../lsm/common/impls/ExternalIndexHarness.java | 39 ++-- .../am/lsm/common/impls/IndexFactory.java | 3 +- .../common/impls/LSMComponentFilterFactory.java | 2 +- .../common/impls/LSMComponentFilterManager.java | 8 +- .../storage/am/lsm/common/impls/LSMHarness.java | 57 +++-- .../lsm/common/impls/LSMTreeIndexAccessor.java | 10 +- .../common/impls/MemoryComponentMetadata.java | 87 ++++++++ .../common/impls/NoOpIOOperationCallback.java | 12 +- .../am/lsm/common/impls/PrefixMergePolicy.java | 27 +-- .../lsm/common/impls/ThreadCountingTracker.java | 13 +- .../lsm/common/utils/ComponentMetadataUtil.java | 118 ++++++++++ .../pom.xml | 7 +- .../lsm/invertedindex/api/IInvertedIndex.java | 12 +- .../LSMInvertedIndexDataflowHelper.java | 2 +- ...rtitionedLSMInvertedIndexDataflowHelper.java | 2 +- .../invertedindex/impls/LSMInvertedIndex.java | 165 +++++++------- .../impls/LSMInvertedIndexAccessor.java | 14 +- .../impls/LSMInvertedIndexDiskComponent.java | 9 +- .../LSMInvertedIndexDiskComponentFactory.java | 22 +- .../impls/LSMInvertedIndexFlushOperation.java | 8 +- .../impls/LSMInvertedIndexMemoryComponent.java | 6 +- .../impls/LSMInvertedIndexMergeOperation.java | 10 +- .../impls/LSMInvertedIndexOpContext.java | 22 +- .../inmemory/InMemoryInvertedIndex.java | 7 +- .../PartitionedInMemoryInvertedIndex.java | 16 +- .../ondisk/OnDiskInvertedIndex.java | 51 +++-- .../ondisk/OnDiskInvertedIndexFactory.java | 3 +- .../ondisk/PartitionedOnDiskInvertedIndex.java | 6 +- .../PartitionedOnDiskInvertedIndexFactory.java | 12 +- .../invertedindex/util/InvertedIndexUtils.java | 89 ++++---- .../dataflow/ExternalRTreeDataflowHelper.java | 2 +- .../rtree/dataflow/LSMRTreeDataflowHelper.java | 2 +- ...RTreeWithAntiMatterTuplesDataflowHelper.java | 2 +- .../am/lsm/rtree/impls/AbstractLSMRTree.java | 87 ++++---- .../am/lsm/rtree/impls/ExternalRTree.java | 105 ++++----- .../lsm/rtree/impls/ExternalRTreeOpContext.java | 21 +- .../storage/am/lsm/rtree/impls/LSMRTree.java | 135 ++++++------ .../lsm/rtree/impls/LSMRTreeDiskComponent.java | 7 +- .../impls/LSMRTreeDiskComponentFactory.java | 17 +- .../lsm/rtree/impls/LSMRTreeFlushOperation.java | 8 +- .../rtree/impls/LSMRTreeMemoryComponent.java | 6 +- .../lsm/rtree/impls/LSMRTreeMergeOperation.java | 10 +- .../am/lsm/rtree/impls/LSMRTreeOpContext.java | 25 +-- .../impls/LSMRTreeWithAntiMatterTuples.java | 76 +++---- ...ithAntiMatterTuplesDiskComponentFactory.java | 17 +- .../am/lsm/rtree/impls/RTreeFactory.java | 3 +- .../common/buffercache/IBufferCache.java | 42 ++-- .../storage/am/btree/BTreeExamplesTest.java | 7 +- .../btree/multithread/BTreeMultiThreadTest.java | 19 +- .../storage/am/lsm/common/DummyTreeFactory.java | 3 +- 153 files changed, 2677 insertions(+), 2174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java deleted file mode 100644 index d9a0a79..0000000 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.api.http.servlet; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayOutputStream; -import java.io.PrintWriter; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.asterix.api.http.server.ConnectorApiServlet; -import org.apache.asterix.file.StorageComponentProvider; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.metadata.MetadataTransactionContext; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.utils.JSONDeserializerForTypes; -import org.apache.asterix.test.runtime.SqlppExecutionTest; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.client.NodeControllerInfo; -import org.apache.hyracks.api.comm.NetworkAddress; -import org.apache.hyracks.api.io.FileSplit; -import org.apache.hyracks.api.io.ManagedFileSplit; -import org.apache.hyracks.http.api.IServletRequest; -import org.apache.hyracks.http.api.IServletResponse; -import org.junit.Assert; -import org.junit.Test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpMethod; -import junit.extensions.PA; - -public class ConnectorApiLetTest { - - @Test - public void testGet() throws Exception { - // Starts test asterixdb cluster. - SqlppExecutionTest.setUp(); - - // Configures a test connector api servlet. - ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" }); - Map<String, NodeControllerInfo> nodeMap = new HashMap<>(); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - PrintWriter outputWriter = new PrintWriter(outputStream); - - // Creates mocks. - IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class); - NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class); - NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class); - IServletRequest mockRequest = mock(IServletRequest.class); - IServletResponse mockResponse = mock(IServletResponse.class); - FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class); - - // Put stuff in let map - let.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc); - // Sets up mock returns. - when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest); - when(mockHttpRequest.method()).thenReturn(HttpMethod.GET); - when(mockRequest.getParameter("dataverseName")).thenReturn("Metadata"); - when(mockRequest.getParameter("datasetName")).thenReturn("Dataset"); - when(mockResponse.writer()).thenReturn(outputWriter); - when(mockHcc.getNodeControllerInfos()).thenReturn(nodeMap); - when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099)); - when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099)); - - // Calls ConnectorAPIServlet.formResponseObject. - nodeMap.put("asterix_nc1", mockInfo1); - nodeMap.put("asterix_nc2", mockInfo2); - let.handle(mockRequest, mockResponse); - - // Constructs the actual response. - ObjectMapper om = new ObjectMapper(); - ObjectNode actualResponse = (ObjectNode) om.readTree(outputStream.toString()); - - // Checks the temp-or-not, primary key, data type of the dataset. - boolean temp = actualResponse.get("temp").asBoolean(); - Assert.assertFalse(temp); - String primaryKey = actualResponse.get("keys").asText(); - Assert.assertEquals("DataverseName,DatasetName", primaryKey); - ARecordType recordType = (ARecordType) JSONDeserializerForTypes.convertFromJSON(actualResponse.get("type")); - Assert.assertEquals(getMetadataRecordType("Metadata", "Dataset"), recordType); - - // Checks the correctness of results. - ArrayNode splits = (ArrayNode) actualResponse.get("splits"); - String path = (splits.get(0)).get("path").asText(); - Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset")); - - // Tears down the asterixdb cluster. - SqlppExecutionTest.tearDown(); - } - - @Test - public void testFormResponseObject() throws Exception { - ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" }); - ObjectMapper om = new ObjectMapper(); - ObjectNode actualResponse = om.createObjectNode(); - FileSplit[] splits = new FileSplit[2]; - splits[0] = new ManagedFileSplit("asterix_nc1", "foo1"); - splits[1] = new ManagedFileSplit("asterix_nc2", "foo2"); - Map<String, NodeControllerInfo> nodeMap = new HashMap<>(); - NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class); - NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class); - - // Sets up mock returns. - when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099)); - when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099)); - - String[] fieldNames = new String[] { "a1", "a2" }; - IAType[] fieldTypes = new IAType[] { BuiltinType.ABOOLEAN, BuiltinType.ADAYTIMEDURATION }; - ARecordType recordType = new ARecordType("record", fieldNames, fieldTypes, true); - String primaryKey = "a1"; - - // Calls ConnectorAPIServlet.formResponseObject. - nodeMap.put("asterix_nc1", mockInfo1); - nodeMap.put("asterix_nc2", mockInfo2); - PA.invokeMethod(let, - "formResponseObject(" + ObjectNode.class.getName() + ", " + FileSplit.class.getName() + "[], " - + ARecordType.class.getName() + ", " + String.class.getName() + ", boolean, " - + Map.class.getName() + ")", - actualResponse, splits, recordType, primaryKey, true, nodeMap); - // Constructs expected response. - ObjectNode expectedResponse = om.createObjectNode(); - expectedResponse.put("temp", true); - expectedResponse.put("keys", primaryKey); - expectedResponse.set("type", recordType.toJSON()); - ArrayNode splitsArray = om.createArrayNode(); - ObjectNode element1 = om.createObjectNode(); - element1.put("ip", "127.0.0.1"); - element1.put("path", splits[0].getPath()); - ObjectNode element2 = om.createObjectNode(); - element2.put("ip", "127.0.0.2"); - element2.put("path", splits[1].getPath()); - splitsArray.add(element1); - splitsArray.add(element2); - expectedResponse.set("splits", splitsArray); - - // Checks results. - Assert.assertEquals(actualResponse.toString(), expectedResponse.toString()); - } - - private ARecordType getMetadataRecordType(String dataverseName, String datasetName) throws Exception { - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - // Retrieves file splits of the dataset. - MetadataProvider metadataProvider = new MetadataProvider(null, new StorageComponentProvider()); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); - ARecordType recordType = - (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); - // Metadata transaction commits. - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - return recordType; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java new file mode 100644 index 0000000..7da3b32 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.api.http.servlet; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.asterix.api.http.server.ConnectorApiServlet; +import org.apache.asterix.file.StorageComponentProvider; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.utils.JSONDeserializerForTypes; +import org.apache.asterix.test.runtime.SqlppExecutionTest; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.io.ManagedFileSplit; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.junit.Assert; +import org.junit.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import junit.extensions.PA; + +public class ConnectorApiServletTest { + + @Test + public void testGet() throws Exception { + // Starts test asterixdb cluster. + SqlppExecutionTest.setUp(); + + // Configures a test connector api servlet. + ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" }); + Map<String, NodeControllerInfo> nodeMap = new HashMap<>(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + PrintWriter outputWriter = new PrintWriter(outputStream); + + // Creates mocks. + IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class); + NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class); + NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class); + IServletRequest mockRequest = mock(IServletRequest.class); + IServletResponse mockResponse = mock(IServletResponse.class); + FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class); + + // Put stuff in let map + let.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc); + // Sets up mock returns. + when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest); + when(mockHttpRequest.method()).thenReturn(HttpMethod.GET); + when(mockRequest.getParameter("dataverseName")).thenReturn("Metadata"); + when(mockRequest.getParameter("datasetName")).thenReturn("Dataset"); + when(mockResponse.writer()).thenReturn(outputWriter); + when(mockHcc.getNodeControllerInfos()).thenReturn(nodeMap); + when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099)); + when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099)); + + // Calls ConnectorAPIServlet.formResponseObject. + nodeMap.put("asterix_nc1", mockInfo1); + nodeMap.put("asterix_nc2", mockInfo2); + let.handle(mockRequest, mockResponse); + + // Constructs the actual response. + ObjectMapper om = new ObjectMapper(); + ObjectNode actualResponse = (ObjectNode) om.readTree(outputStream.toString()); + + // Checks the temp-or-not, primary key, data type of the dataset. + boolean temp = actualResponse.get("temp").asBoolean(); + Assert.assertFalse(temp); + String primaryKey = actualResponse.get("keys").asText(); + Assert.assertEquals("DataverseName,DatasetName", primaryKey); + ARecordType recordType = (ARecordType) JSONDeserializerForTypes.convertFromJSON(actualResponse.get("type")); + Assert.assertEquals(getMetadataRecordType("Metadata", "Dataset"), recordType); + + // Checks the correctness of results. + ArrayNode splits = (ArrayNode) actualResponse.get("splits"); + String path = (splits.get(0)).get("path").asText(); + Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset")); + + // Tears down the asterixdb cluster. + SqlppExecutionTest.tearDown(); + } + + @Test + public void testFormResponseObject() throws Exception { + ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" }); + ObjectMapper om = new ObjectMapper(); + ObjectNode actualResponse = om.createObjectNode(); + FileSplit[] splits = new FileSplit[2]; + splits[0] = new ManagedFileSplit("asterix_nc1", "foo1"); + splits[1] = new ManagedFileSplit("asterix_nc2", "foo2"); + Map<String, NodeControllerInfo> nodeMap = new HashMap<>(); + NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class); + NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class); + + // Sets up mock returns. + when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099)); + when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099)); + + String[] fieldNames = new String[] { "a1", "a2" }; + IAType[] fieldTypes = new IAType[] { BuiltinType.ABOOLEAN, BuiltinType.ADAYTIMEDURATION }; + ARecordType recordType = new ARecordType("record", fieldNames, fieldTypes, true); + String primaryKey = "a1"; + + // Calls ConnectorAPIServlet.formResponseObject. + nodeMap.put("asterix_nc1", mockInfo1); + nodeMap.put("asterix_nc2", mockInfo2); + PA.invokeMethod(let, + "formResponseObject(" + ObjectNode.class.getName() + ", " + FileSplit.class.getName() + "[], " + + ARecordType.class.getName() + ", " + String.class.getName() + ", boolean, " + + Map.class.getName() + ")", + actualResponse, splits, recordType, primaryKey, true, nodeMap); + // Constructs expected response. + ObjectNode expectedResponse = om.createObjectNode(); + expectedResponse.put("temp", true); + expectedResponse.put("keys", primaryKey); + expectedResponse.set("type", recordType.toJSON()); + ArrayNode splitsArray = om.createArrayNode(); + ObjectNode element1 = om.createObjectNode(); + element1.put("ip", "127.0.0.1"); + element1.put("path", splits[0].getPath()); + ObjectNode element2 = om.createObjectNode(); + element2.put("ip", "127.0.0.2"); + element2.put("path", splits[1].getPath()); + splitsArray.add(element1); + splitsArray.add(element2); + expectedResponse.set("splits", splitsArray); + + // Checks results. + Assert.assertEquals(actualResponse.toString(), expectedResponse.toString()); + } + + private ARecordType getMetadataRecordType(String dataverseName, String datasetName) throws Exception { + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + // Retrieves file splits of the dataset. + MetadataProvider metadataProvider = new MetadataProvider(null, new StorageComponentProvider()); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); + ARecordType recordType = + (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + // Metadata transaction commits. + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + return recordType; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java deleted file mode 100644 index 619e7a5..0000000 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.api.http.servlet; - -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayOutputStream; -import java.io.PrintWriter; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.asterix.api.http.server.VersionApiServlet; -import org.apache.asterix.common.config.BuildProperties; -import org.apache.asterix.runtime.utils.AppContextInfo; -import org.apache.asterix.test.runtime.SqlppExecutionTest; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.http.api.IServletRequest; -import org.apache.hyracks.http.api.IServletResponse; -import org.junit.Assert; -import org.junit.Test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpMethod; - -public class VersionApiLetTest { - - @Test - public void testGet() throws Exception { - // Starts test asterixdb cluster. - SqlppExecutionTest.setUp(); - - // Configures a test version api servlet. - VersionApiServlet servlet = new VersionApiServlet(new ConcurrentHashMap<>(), new String[] { "/" }); - Map<String, String> propMap = new HashMap<>(); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - PrintWriter outputWriter = new PrintWriter(outputStream); - - // Creates mocks. - AppContextInfo mockCtx = mock(AppContextInfo.class); - IServletRequest mockRequest = mock(IServletRequest.class); - IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class); - IServletResponse mockResponse = mock(IServletResponse.class); - BuildProperties mockProperties = mock(BuildProperties.class); - FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class); - - // Put stuff in let map - servlet.ctx().put(HYRACKS_CONNECTION_ATTR, mockHcc); - servlet.ctx().put(ASTERIX_BUILD_PROP_ATTR, mockCtx); - // Sets up mock returns. - when(mockResponse.writer()).thenReturn(outputWriter); - when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest); - when(mockHttpRequest.method()).thenReturn(HttpMethod.GET); - when(mockCtx.getBuildProperties()).thenReturn(mockProperties); - when(mockProperties.getAllProps()).thenReturn(propMap); - - propMap.put("git.build.user.email", "[email protected]"); - propMap.put("git.build.host", "fulliautomatix"); - propMap.put("git.dirty", "true"); - propMap.put("git.remote.origin.url", "[email protected]:apache/incubator-asterixdb.git"); - propMap.put("git.closest.tag.name", "asterix-0.8.7-incubating"); - propMap.put("git.commit.id.describe-short", "asterix-0.8.7-incubating-19-dirty"); - propMap.put("git.commit.user.email", "[email protected]"); - propMap.put("git.commit.time", "21.10.2015 @ 23:36:41 PDT"); - propMap.put("git.commit.message.full", - "ASTERIXDB-1045: fix log file reading during recovery\n\nChange-Id: Ic83ee1dd2d7ba88180c25f4ec6c7aa8d0a5a7162\nReviewed-on: https://asterix-gerrit.ics.uci.edu/465\nTested-by: Jenkins <[email protected]>"); - propMap.put("git.build.version", "0.8.8-SNAPSHOT"); - propMap.put("git.commit.message.short", "ASTERIXDB-1045: fix log file reading during recovery"); - propMap.put("git.commit.id.abbrev", "e1dad19"); - propMap.put("git.branch", "foo/bar"); - propMap.put("git.build.user.name", "Asterix"); - propMap.put("git.closest.tag.commit.count", "19"); - propMap.put("git.commit.id.describe", "asterix-0.8.7-incubating-19-ge1dad19-dirty"); - propMap.put("git.commit.id", "e1dad1984640517366a7e73e323c9de27b0676f7"); - propMap.put("git.tags", ""); - propMap.put("git.build.time", "22.10.2015 @ 17:11:07 PDT"); - propMap.put("git.commit.user.name", "Obelix"); - - // Calls VersionAPIServlet.formResponseObject. - servlet.handle(mockRequest, mockResponse); - - // Constructs the actual response. - - ObjectMapper om = new ObjectMapper(); - ObjectNode actualResponse = (ObjectNode) om.readTree(outputStream.toByteArray()); - ObjectNode expectedResponse = om.createObjectNode(); - for (Map.Entry<String, String> e : propMap.entrySet()) { - expectedResponse.put(e.getKey(), e.getValue()); - } - - // Checks the response contains all the expected keys. - Assert.assertEquals(actualResponse.toString(), expectedResponse.toString()); - - // Tears down the asterixdb cluster. - SqlppExecutionTest.tearDown(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java new file mode 100644 index 0000000..7fed010 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.api.http.servlet; + +import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR; +import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.asterix.api.http.server.VersionApiServlet; +import org.apache.asterix.common.config.BuildProperties; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.test.runtime.SqlppExecutionTest; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.junit.Assert; +import org.junit.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; + +public class VersionApiServletTest { + + @Test + public void testGet() throws Exception { + // Starts test asterixdb cluster. + SqlppExecutionTest.setUp(); + + // Configures a test version api servlet. + VersionApiServlet servlet = new VersionApiServlet(new ConcurrentHashMap<>(), new String[] { "/" }); + Map<String, String> propMap = new HashMap<>(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + PrintWriter outputWriter = new PrintWriter(outputStream); + + // Creates mocks. + AppContextInfo mockCtx = mock(AppContextInfo.class); + IServletRequest mockRequest = mock(IServletRequest.class); + IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class); + IServletResponse mockResponse = mock(IServletResponse.class); + BuildProperties mockProperties = mock(BuildProperties.class); + FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class); + + // Put stuff in let map + servlet.ctx().put(HYRACKS_CONNECTION_ATTR, mockHcc); + servlet.ctx().put(ASTERIX_BUILD_PROP_ATTR, mockCtx); + // Sets up mock returns. + when(mockResponse.writer()).thenReturn(outputWriter); + when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest); + when(mockHttpRequest.method()).thenReturn(HttpMethod.GET); + when(mockCtx.getBuildProperties()).thenReturn(mockProperties); + when(mockProperties.getAllProps()).thenReturn(propMap); + + propMap.put("git.build.user.email", "[email protected]"); + propMap.put("git.build.host", "fulliautomatix"); + propMap.put("git.dirty", "true"); + propMap.put("git.remote.origin.url", "[email protected]:apache/incubator-asterixdb.git"); + propMap.put("git.closest.tag.name", "asterix-0.8.7-incubating"); + propMap.put("git.commit.id.describe-short", "asterix-0.8.7-incubating-19-dirty"); + propMap.put("git.commit.user.email", "[email protected]"); + propMap.put("git.commit.time", "21.10.2015 @ 23:36:41 PDT"); + propMap.put("git.commit.message.full", + "ASTERIXDB-1045: fix log file reading during recovery\n\nChange-Id: Ic83ee1dd2d7ba88180c25f4ec6c7aa8d0a5a7162\nReviewed-on: https://asterix-gerrit.ics.uci.edu/465\nTested-by: Jenkins <[email protected]>"); + propMap.put("git.build.version", "0.8.8-SNAPSHOT"); + propMap.put("git.commit.message.short", "ASTERIXDB-1045: fix log file reading during recovery"); + propMap.put("git.commit.id.abbrev", "e1dad19"); + propMap.put("git.branch", "foo/bar"); + propMap.put("git.build.user.name", "Asterix"); + propMap.put("git.closest.tag.commit.count", "19"); + propMap.put("git.commit.id.describe", "asterix-0.8.7-incubating-19-ge1dad19-dirty"); + propMap.put("git.commit.id", "e1dad1984640517366a7e73e323c9de27b0676f7"); + propMap.put("git.tags", ""); + propMap.put("git.build.time", "22.10.2015 @ 17:11:07 PDT"); + propMap.put("git.commit.user.name", "Obelix"); + + // Calls VersionAPIServlet.formResponseObject. + servlet.handle(mockRequest, mockResponse); + + // Constructs the actual response. + + ObjectMapper om = new ObjectMapper(); + ObjectNode actualResponse = (ObjectNode) om.readTree(outputStream.toByteArray()); + ObjectNode expectedResponse = om.createObjectNode(); + for (Map.Entry<String, String> e : propMap.entrySet()) { + expectedResponse.put(e.getKey(), e.getValue()); + } + + // Checks the response contains all the expected keys. + Assert.assertEquals(actualResponse.toString(), expectedResponse.toString()); + + // Tears down the asterixdb cluster. + SqlppExecutionTest.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java index edd1848..1467dbf 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java @@ -51,6 +51,7 @@ import org.apache.hyracks.api.test.CountAnswer; import org.apache.hyracks.api.test.FrameWriterTestUtils; import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation; import org.apache.hyracks.api.util.HyracksConstants; +import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; @@ -58,6 +59,7 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree; import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -68,8 +70,8 @@ public class LogMarkerTest { private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); - private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC, - GenerationFunction.DETERMINISTIC }; + private static final GenerationFunction[] RECORD_GEN_FUNCTION = + { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC }; private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; private static final ARecordType META_TYPE = null; private static final GenerationFunction[] META_GEN_FUNCTION = null; @@ -146,7 +148,9 @@ public class LogMarkerTest { KEY_INDICATORS_LIST); dataflowHelper.open(); LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance(); - long lsn = btree.getMostRecentMarkerLSN(); + LongPointable longPointable = LongPointable.FACTORY.createPointable(); + ComponentMetadataUtil.get(btree, ComponentMetadataUtil.MARKER_LSN_KEY, longPointable); + long lsn = longPointable.getLong(); int numOfMarkers = 0; LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false); long expectedMarkerId = markerId - 1; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index 70339f3..e16ce78 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -30,12 +30,12 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager; import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent; public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { @@ -60,18 +60,18 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component. // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule // a merge all of the current candidates into a new single component. - List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents()); + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); // Reverse the components order so that we look at components from oldest to newest. Collections.reverse(immutableComponents); - for (ILSMComponent c : immutableComponents) { + for (ILSMDiskComponent c : immutableComponents) { if (c.getState() != ComponentState.READABLE_UNWRITABLE) { return; } } if (fullMergeIsRequested) { - ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + ILSMIndexAccessor accessor = + index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleFullMerge(index.getIOOperationCallback()); return; } @@ -89,7 +89,7 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { for (int i = 0; i < minNumComponents; i++) { ILSMComponent c = immutableComponents.get(i); - long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize(); + long componentSize = ((ILSMDiskComponent) c).getComponentSize(); if (componentSize > maxMergableComponentSize) { startIndex = i; totalSize = 0; @@ -101,20 +101,20 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) { for (ILSMIndex lsmIndex : indexes) { - List<ILSMComponent> reversedImmutableComponents = new ArrayList<ILSMComponent>( - lsmIndex.getImmutableComponents()); + List<ILSMDiskComponent> reversedImmutableComponents = + new ArrayList<>(lsmIndex.getImmutableComponents()); // Reverse the components order so that we look at components from oldest to newest. Collections.reverse(reversedImmutableComponents); - List<ILSMComponent> mergableComponents = new ArrayList<ILSMComponent>(); + List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); for (int j = startIndex + 1; j <= i; j++) { mergableComponents.add(reversedImmutableComponents.get(j)); } // Reverse the components order back to its original order Collections.reverse(mergableComponents); - ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } break; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 6350d73..7fca039 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -35,7 +35,6 @@ import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; @@ -101,8 +100,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { if (!flushOnExit) { for (ILSMIndex lsmIndex : indexes) { - ILSMIndexInternal lsmIndexInternal = (ILSMIndexInternal) lsmIndex; - if (lsmIndexInternal.hasFlushRequestForCurrentMutableComponent()) { + if (lsmIndex.hasFlushRequestForCurrentMutableComponent()) { needsFlush = true; break; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java index cd40179..04090bb 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java @@ -21,7 +21,8 @@ package org.apache.asterix.common.dataflow; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.transactions.ILogManager; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; public class LSMIndexUtil { @@ -33,18 +34,18 @@ public class LSMIndexUtil { //prevent transactions from incorrectly setting the first LSN on a modified component by checking the index is still empty synchronized (lsmIndex.getOperationTracker()) { if (lsmIndex.isCurrentMutableComponentEmpty()) { - AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex - .getIOOperationCallback(); + AbstractLSMIOOperationCallback ioOpCallback = + (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); ioOpCallback.setFirstLSN(logManager.getAppendLSN()); } } } } - public static long getComponentFileLSNOffset(AbstractLSMIndex lsmIndex, ILSMComponent lsmComponent, + public static long getComponentFileLSNOffset(ILSMIndex lsmIndex, ILSMDiskComponent lsmComponent, String componentFilePath) throws HyracksDataException { - AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex - .getIOOperationCallback(); + AbstractLSMIOOperationCallback ioOpCallback = + (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); return ioOpCallback.getComponentFileLSNOffset(lsmComponent, componentFilePath); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java index fded0d9..0c099fb 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java @@ -24,6 +24,7 @@ import java.io.Serializable; import org.apache.hyracks.api.exceptions.HyracksDataException; public class RuntimeDataException extends HyracksDataException { + private static final long serialVersionUID = 1L; public RuntimeDataException(int errorCode, Serializable... params) { super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), params); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index 29af7c9..f903b65 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -27,13 +27,13 @@ import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback { - public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN" - .getBytes()); + public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN".getBytes()); public static final long INVALID = -1L; // First LSN per mutable component @@ -78,7 +78,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } @Override - public void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) { + public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) { // The operation was complete and the next I/O operation for the LSM index didn't start yet if (opType == LSMOperationType.FLUSH && newComponent != null) { synchronized (this) { @@ -93,15 +93,11 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } } - public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException; + public abstract long getComponentLSN(List<? extends ILSMComponent> oldComponents) throws HyracksDataException; - public void putLSNIntoMetadata(ITreeIndex treeIndex, List<ILSMComponent> oldComponents) + public void putLSNIntoMetadata(ILSMDiskComponent index, List<ILSMComponent> oldComponents) throws HyracksDataException { - byte[] lsn = new byte[Long.BYTES]; - LongPointable.setLong(lsn, 0, getComponentLSN(oldComponents)); - IMetadataPageManager metadataPageManager = (IMetadataPageManager) treeIndex.getPageManager(); - metadataPageManager.put(metadataPageManager.createMetadataFrame(), LSN_KEY, new MutableArrayValueReference( - lsn)); + index.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents))); } public static long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException { @@ -144,6 +140,6 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC * otherwise {@link IMetadataPageManager#INVALID_LSN_OFFSET}. * @throws HyracksDataException */ - public abstract long getComponentFileLSNOffset(ILSMComponent component, String componentFilePath) + public abstract long getComponentFileLSNOffset(ILSMDiskComponent component, String componentFilePath) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java index 142b84f..173c962 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java @@ -22,12 +22,15 @@ package org.apache.asterix.common.ioopcallbacks; import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil; public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback { @@ -36,16 +39,24 @@ public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent) - throws HyracksDataException { + public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + ILSMDiskComponent newComponent) throws HyracksDataException { + //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here if (newComponent != null) { LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) newComponent; - putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents); + putLSNIntoMetadata(btreeComponent, oldComponents); + if (opType == LSMOperationType.MERGE) { + LongPointable markerLsn = LongPointable.FACTORY + .createPointable(ComponentMetadataUtil.getLong(oldComponents.get(0).getMetadata(), + ComponentMetadataUtil.MARKER_LSN_KEY, ComponentMetadataUtil.NOT_FOUND)); + btreeComponent.getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, markerLsn); + } + } } @Override - public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException { + public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException { if (diskComponents == null) { // Implies a flush IO operation. --> moves the flush pointer // Flush operation of an LSM index are executed sequentially. @@ -64,12 +75,12 @@ public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback } @Override - public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath) + public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath) throws HyracksDataException { if (diskComponentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) { LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) diskComponent; - IMetadataPageManager metadataPageManager = (IMetadataPageManager) btreeComponent.getBTree() - .getPageManager(); + IMetadataPageManager metadataPageManager = + (IMetadataPageManager) btreeComponent.getBTree().getPageManager(); return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY); } return INVALID; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java index 21882dd..322b5ef 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java @@ -32,7 +32,7 @@ public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallba } @Override - public ILSMIOOperationCallback createIOOperationCallback() { + public ILSMIOOperationCallback createIoOpCallback() { return new LSMBTreeIOOperationCallback(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java index a8c545d..6c987d6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java @@ -25,21 +25,22 @@ import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyDiskComponent; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperationCallback { @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent) - throws HyracksDataException { + public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + ILSMDiskComponent newComponent) throws HyracksDataException { if (newComponent != null) { LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) newComponent; - putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents); + putLSNIntoMetadata(btreeComponent, oldComponents); } } @Override - public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException { + public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException { if (diskComponents == null) { // Implies a flush IO operation <Will never happen currently as Btree with buddy btree is only used with external datasets> synchronized (this) { @@ -57,12 +58,12 @@ public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperation } @Override - public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath) + public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath) throws HyracksDataException { if (diskComponentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)) { LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) diskComponent; - IMetadataPageManager metadataPageManager = (IMetadataPageManager) btreeComponent.getBTree() - .getPageManager(); + IMetadataPageManager metadataPageManager = + (IMetadataPageManager) btreeComponent.getBTree().getPageManager(); return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY); } return INVALID; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java index 848de29..1055fa6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java @@ -32,7 +32,7 @@ public class LSMBTreeWithBuddyIOOperationCallbackFactory implements ILSMIOOperat } @Override - public ILSMIOOperationCallback createIOOperationCallback() { + public ILSMIOOperationCallback createIoOpCallback() { return new LSMBTreeWithBuddyIOOperationCallback(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java index ec09d65..657d908 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent; import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager; @@ -35,16 +36,16 @@ public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationC } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent) - throws HyracksDataException { + public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + ILSMDiskComponent newComponent) throws HyracksDataException { if (newComponent != null) { LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) newComponent; - putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents); + putLSNIntoMetadata(invIndexComponent, oldComponents); } } @Override - public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException { + public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException { if (diskComponents == null) { // Implies a flush IO operation. synchronized (this) { @@ -56,18 +57,19 @@ public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationC long maxLSN = -1; for (Object o : diskComponents) { LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) o; - maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), maxLSN); + maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), + maxLSN); } return maxLSN; } @Override - public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath) + public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath) throws HyracksDataException { if (diskComponentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) { LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) diskComponent; - IMetadataPageManager metadataPageManager = (IMetadataPageManager) invIndexComponent.getDeletedKeysBTree() - .getPageManager(); + IMetadataPageManager metadataPageManager = + (IMetadataPageManager) invIndexComponent.getDeletedKeysBTree().getPageManager(); return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY); } return INVALID; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java index 8951cb4..0fa0167 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java @@ -33,7 +33,7 @@ public class LSMInvertedIndexIOOperationCallbackFactory implements ILSMIOOperati } @Override - public ILSMIOOperationCallback createIOOperationCallback() { + public ILSMIOOperationCallback createIoOpCallback() { return new LSMInvertedIndexIOOperationCallback(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java index 175250d..2dc06f7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent; import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager; @@ -35,16 +36,16 @@ public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent) - throws HyracksDataException { + public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + ILSMDiskComponent newComponent) throws HyracksDataException { if (newComponent != null) { LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) newComponent; - putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents); + putLSNIntoMetadata(rtreeComponent, oldComponents); } } @Override - public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException { + public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException { if (diskComponents == null) { // Implies a flush IO operation. synchronized (this) { @@ -62,12 +63,12 @@ public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback } @Override - public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath) + public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath) throws HyracksDataException { if (diskComponentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)) { LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) diskComponent; - IMetadataPageManager metadataPageManager = (IMetadataPageManager) rtreeComponent.getRTree() - .getPageManager(); + IMetadataPageManager metadataPageManager = + (IMetadataPageManager) rtreeComponent.getRTree().getPageManager(); return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY); } return INVALID; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java index 954c6e1..83db16a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java @@ -32,7 +32,7 @@ public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallba } @Override - public ILSMIOOperationCallback createIOOperationCallback() { + public ILSMIOOperationCallback createIoOpCallback() { return new LSMRTreeIOOperationCallback(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java index 7dae65f..b977c4d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java @@ -19,33 +19,104 @@ package org.apache.asterix.common.transactions; import java.nio.ByteBuffer; +import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil; /** * A basic callback used to write marker to transaction logs */ public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback { - - private AbstractLSMIndex index; + private final LongPointable pointable = LongPointable.FACTORY.createPointable(); + private final ILSMIndex index; /** * @param index: * a pointer to the primary index used to store marker log info * @throws HyracksDataException */ - public PrimaryIndexLogMarkerCallback(AbstractLSMIndex index) throws HyracksDataException { + public PrimaryIndexLogMarkerCallback(ILSMIndex index) throws HyracksDataException { this.index = index; } @Override public void before(ByteBuffer buffer) { - buffer.putLong(index.getCurrentMemoryComponent().getMostRecentMarkerLSN()); + buffer.putLong(getLsn()); + } + + private long getLsn() { + long lsn; + try { + lsn = ComponentMetadataUtil.getLong(index.getCurrentMemoryComponent().getMetadata(), + ComponentMetadataUtil.MARKER_LSN_KEY, ComponentMetadataUtil.NOT_FOUND); + } catch (HyracksDataException e) { + // Should never happen since this is a memory component + throw new IllegalStateException(e); + } + if (lsn == ComponentMetadataUtil.NOT_FOUND) { + synchronized (index.getOperationTracker()) { + // look for it in previous memory component if exists + lsn = lsnFromImmutableMemoryComponents(); + if (lsn == ComponentMetadataUtil.NOT_FOUND) { + // look for it in disk component + lsn = lsnFromDiskComponents(); + } + } + } + return lsn; + } + + private long lsnFromDiskComponents() { + List<ILSMDiskComponent> diskComponents = index.getImmutableComponents(); + for (ILSMDiskComponent c : diskComponents) { + try { + long lsn = ComponentMetadataUtil.getLong(c.getMetadata(), ComponentMetadataUtil.MARKER_LSN_KEY, + ComponentMetadataUtil.NOT_FOUND); + if (lsn != ComponentMetadataUtil.NOT_FOUND) { + return lsn; + } + } catch (HyracksDataException e) { + throw new IllegalStateException("Unable to read metadata page. Disk Error?", e); + } + } + return ComponentMetadataUtil.NOT_FOUND; + } + + private long lsnFromImmutableMemoryComponents() { + List<ILSMMemoryComponent> memComponents = index.getMemoryComponents(); + int numOtherMemComponents = memComponents.size() - 1; + int next = index.getCurrentMemoryComponentIndex(); + long lsn = ComponentMetadataUtil.NOT_FOUND; + for (int i = 0; i < numOtherMemComponents; i++) { + next = next - 1; + if (next < 0) { + next = memComponents.size() - 1; + } + ILSMMemoryComponent c = index.getMemoryComponents().get(next); + if (c.isReadable()) { + try { + lsn = ComponentMetadataUtil.getLong(c.getMetadata(), ComponentMetadataUtil.MARKER_LSN_KEY, + ComponentMetadataUtil.NOT_FOUND); + } catch (HyracksDataException e) { + // Should never happen since this is a memory component + throw new IllegalStateException(e); + } + if (lsn != ComponentMetadataUtil.NOT_FOUND) { + return lsn; + } + } + } + return lsn; } @Override public void after(long lsn) { - index.getCurrentMemoryComponent().setMostRecentMarkerLSN(lsn); + pointable.setLong(lsn); + index.getCurrentMemoryComponent().getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, pointable); } }
