This is an automated email from the ASF dual-hosted git repository.

vgalaxies pushed a commit to branch intro-hstore
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 83086b1999c4bbf252ddcfec7a9ca4efa0aa1044
Author: VGalaxies <[email protected]>
AuthorDate: Fri Apr 26 13:48:31 2024 +0800

    git add hugegraph-server/hugegraph-hstore/
---
 hugegraph-server/hugegraph-hstore/pom.xml          |  50 ++
 .../backend/store/hstore/HstoreFeatures.java       | 133 ++++
 .../backend/store/hstore/HstoreMetrics.java        |  44 ++
 .../backend/store/hstore/HstoreOptions.java        |  52 ++
 .../backend/store/hstore/HstoreProvider.java       |  54 ++
 .../backend/store/hstore/HstoreSessions.java       | 208 ++++++
 .../backend/store/hstore/HstoreSessionsImpl.java   | 802 ++++++++++++++++++++
 .../backend/store/hstore/HstoreStore.java          | 825 +++++++++++++++++++++
 .../backend/store/hstore/HstoreTable.java          | 732 ++++++++++++++++++
 .../backend/store/hstore/HstoreTables.java         | 214 ++++++
 .../backend/store/hstore/fake/IdClient.java        |  54 ++
 .../backend/store/hstore/fake/PDIdClient.java      |  48 ++
 12 files changed, 3216 insertions(+)

diff --git a/hugegraph-server/hugegraph-hstore/pom.xml 
b/hugegraph-server/hugegraph-hstore/pom.xml
new file mode 100644
index 000000000..f777eb05e
--- /dev/null
+++ b/hugegraph-server/hugegraph-hstore/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with this
+  work for additional information regarding copyright ownership. The ASF
+  licenses this file to You under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and limitations
+  under the License.
+  -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>hugegraph-server</artifactId>
+        <groupId>org.apache.hugegraph</groupId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hugegraph-hstore</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hugegraph</groupId>
+            <artifactId>hugegraph-core</artifactId>
+            <version>${revision}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hugegraph</groupId>
+            <artifactId>hg-store-client</artifactId>
+            <version>${revision}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hugegraph</groupId>
+            <artifactId>hg-pd-client</artifactId>
+            <version>${revision}</version>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreFeatures.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreFeatures.java
new file mode 100644
index 000000000..3af6f803b
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreFeatures.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import org.apache.hugegraph.backend.store.BackendFeatures;
+
+public class HstoreFeatures implements BackendFeatures {
+
+    @Override
+    public boolean supportsScanToken() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsScanKeyPrefix() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsScanKeyRange() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsQuerySchemaByName() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsQueryByLabel() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsQueryWithInCondition() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsQueryWithRangeCondition() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsQuerySortByInputIds() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsQueryWithOrderBy() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsQueryWithContains() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsQueryWithContainsKey() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsQueryByPage() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsDeleteEdgeByLabel() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsUpdateVertexProperty() {
+        // Vertex properties are stored in a cell(column value)
+        return false;
+    }
+
+    @Override
+    public boolean supportsMergeVertexProperty() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsUpdateEdgeProperty() {
+        // Edge properties are stored in a cell(column value)
+        return false;
+    }
+
+    @Override
+    public boolean supportsTransaction() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsNumberType() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsAggregateProperty() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsTtl() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOlapProperties() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsTaskAndServerVertex() { return true; }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreMetrics.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreMetrics.java
new file mode 100644
index 000000000..c5f180887
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.backend.store.BackendMetrics;
+import org.apache.hugegraph.util.InsertionOrderUtil;
+
+public class HstoreMetrics implements BackendMetrics {
+
+    private final List<HstoreSessions> dbs;
+    private final HstoreSessions.Session session;
+
+    public HstoreMetrics(List<HstoreSessions> dbs,
+                         HstoreSessions.Session session) {
+        this.dbs = dbs;
+        this.session = session;
+    }
+
+    @Override
+    public Map<String, Object> metrics() {
+        Map<String, Object> results = InsertionOrderUtil.newMap();
+        // TODO(metrics): fetch more metrics from PD
+        results.put(NODES, session.getActiveStoreSize());
+        return results;
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java
new file mode 100644
index 000000000..6de800697
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import static org.apache.hugegraph.config.OptionChecker.disallowEmpty;
+
+import org.apache.hugegraph.config.ConfigOption;
+import org.apache.hugegraph.config.OptionHolder;
+
+public class HstoreOptions extends OptionHolder {
+
+    public static final ConfigOption<Integer> PARTITION_COUNT = new 
ConfigOption<>(
+            "hstore.partition_count",
+            "Number of partitions, which PD controls partitions based on.",
+            disallowEmpty(),
+            0
+    );
+    public static final ConfigOption<Integer> SHARD_COUNT = new ConfigOption<>(
+            "hstore.shard_count",
+            "Number of copies, which PD controls partition copies based on.",
+            disallowEmpty(),
+            0
+    );
+    private static volatile HstoreOptions instance;
+
+    private HstoreOptions() {
+        super();
+    }
+
+    public static synchronized HstoreOptions instance() {
+        if (instance == null) {
+            instance = new HstoreOptions();
+            instance.registerOptions();
+        }
+        return instance;
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreProvider.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreProvider.java
new file mode 100644
index 000000000..f9d48d36c
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import org.apache.hugegraph.backend.store.AbstractBackendStoreProvider;
+import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class HstoreProvider extends AbstractBackendStoreProvider {
+
+    protected String namespace() {
+        return this.graph();
+    }
+
+    @Override
+    public String type() {
+        return "hstore";
+    }
+
+    @Override
+    public String driverVersion() {
+        return "1.13";
+    }
+
+    @Override
+    protected BackendStore newSchemaStore(HugeConfig config, String store) {
+        return new HstoreStore.HstoreSchemaStore(this, this.namespace(), 
store);
+    }
+
+    @Override
+    protected BackendStore newGraphStore(HugeConfig config, String store) {
+        return new HstoreStore.HstoreGraphStore(this, this.namespace(), store);
+    }
+
+    @Override
+    protected BackendStore newSystemStore(HugeConfig config, String store) {
+        return null;
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessions.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessions.java
new file mode 100755
index 000000000..0abb6458b
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessions.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hugegraph.backend.query.Query;
+import org.apache.hugegraph.backend.store.BackendEntry;
+import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
+import 
org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession;
+import org.apache.hugegraph.backend.store.BackendSessionPool;
+import org.apache.hugegraph.config.HugeConfig;
+import org.apache.hugegraph.store.HgOwnerKey;
+import org.apache.hugegraph.type.define.GraphMode;
+
+public abstract class HstoreSessions extends BackendSessionPool {
+
+    public HstoreSessions(HugeConfig config, String database, String store) {
+        super(config, database + "/" + store);
+    }
+
+    public abstract Set<String> openedTables();
+
+    public abstract void createTable(String... tables);
+
+    public abstract void dropTable(String... tables);
+
+    public abstract boolean existsTable(String table);
+
+    public abstract void truncateTable(String table);
+
+    public abstract void clear();
+
+    @Override
+    public abstract Session session();
+
+    public interface Countable {
+
+        public long count();
+    }
+
+    /**
+     * Session for Hstore
+     */
+    public static abstract class Session extends AbstractBackendSession {
+
+        public static final int SCAN_ANY = 0x80;
+        public static final int SCAN_PREFIX_BEGIN = 0x01;
+        public static final int SCAN_PREFIX_END = 0x02;
+        public static final int SCAN_GT_BEGIN = 0x04;
+        public static final int SCAN_GTE_BEGIN = 0x0c;
+        public static final int SCAN_LT_END = 0x10;
+        public static final int SCAN_LTE_END = 0x30;
+        public static final int SCAN_KEY_ONLY = 0x40;
+        public static final int SCAN_HASHCODE = 0x100;
+
+        private HugeConfig conf;
+        private String graphName;
+
+        public static boolean matchScanType(int expected, int actual) {
+            return (expected & actual) == expected;
+        }
+
+        public abstract void createTable(String tableName);
+
+        public abstract void dropTable(String tableName);
+
+        public abstract boolean existsTable(String tableName);
+
+        public abstract void truncateTable(String tableName);
+
+        public abstract void deleteGraph();
+
+        public abstract Pair<byte[], byte[]> keyRange(String table);
+
+        public abstract void put(String table, byte[] ownerKey,
+                                 byte[] key, byte[] value);
+
+        public abstract void increase(String table, byte[] ownerKey,
+                                      byte[] key, byte[] value);
+
+        public abstract void delete(String table, byte[] ownerKey, byte[] key);
+
+        public abstract void deletePrefix(String table, byte[] ownerKey,
+                                          byte[] key);
+
+        public abstract void deleteRange(String table, byte[] ownerKeyFrom,
+                                         byte[] ownerKeyTo, byte[] keyFrom,
+                                         byte[] keyTo);
+
+        public abstract byte[] get(String table, byte[] key);
+
+        public abstract byte[] get(String table, byte[] ownerKey, byte[] key);
+
+        public abstract BackendColumnIterator scan(String table);
+
+        public abstract BackendColumnIterator scan(String table,
+                                                   byte[] ownerKey,
+                                                   byte[] prefix);
+
+        public BackendColumnIterator scan(String table, byte[] ownerKeyFrom,
+                                          byte[] ownerKeyTo, byte[] keyFrom,
+                                          byte[] keyTo) {
+            return this.scan(table, ownerKeyFrom, ownerKeyTo, keyFrom, keyTo,
+                             SCAN_LT_END);
+        }
+
+        public abstract List<BackendColumnIterator> scan(String table,
+                                                         List<HgOwnerKey> keys,
+                                                         int scanType,
+                                                         long limit,
+                                                         byte[] query);
+
+        public abstract BackendEntry.BackendIterator<BackendColumnIterator> 
scan(String table,
+                                                                               
  Iterator<HgOwnerKey> keys,
+                                                                               
  int scanType,
+                                                                               
  Query queryParam,
+                                                                               
  byte[] query);
+
+        public abstract BackendColumnIterator scan(String table,
+                                                   byte[] ownerKeyFrom,
+                                                   byte[] ownerKeyTo,
+                                                   byte[] keyFrom,
+                                                   byte[] keyTo,
+                                                   int scanType);
+
+        public abstract BackendColumnIterator scan(String table,
+                                                   byte[] ownerKeyFrom,
+                                                   byte[] ownerKeyTo,
+                                                   byte[] keyFrom,
+                                                   byte[] keyTo,
+                                                   int scanType,
+                                                   byte[] query);
+
+        public abstract BackendColumnIterator scan(String table,
+                                                   byte[] ownerKeyFrom,
+                                                   byte[] ownerKeyTo,
+                                                   byte[] keyFrom,
+                                                   byte[] keyTo,
+                                                   int scanType,
+                                                   byte[] query,
+                                                   byte[] position);
+
+        public abstract BackendColumnIterator scan(String table,
+                                                   int codeFrom,
+                                                   int codeTo,
+                                                   int scanType,
+                                                   byte[] query);
+
+        public abstract BackendColumnIterator scan(String table,
+                                                   int codeFrom,
+                                                   int codeTo,
+                                                   int scanType,
+                                                   byte[] query,
+                                                   byte[] position);
+
+        public abstract BackendColumnIterator getWithBatch(String table,
+                                                           List<HgOwnerKey> 
keys);
+
+        public abstract void merge(String table, byte[] ownerKey,
+                                   byte[] key, byte[] value);
+
+        public abstract void setMode(GraphMode mode);
+
+        public abstract void truncate() throws Exception;
+
+        public abstract BackendColumnIterator scan(String table,
+                                                   byte[] 
conditionQueryToByte);
+
+        public HugeConfig getConf() {
+            return conf;
+        }
+
+        public void setConf(HugeConfig conf) {
+            this.conf = conf;
+        }
+
+        public String getGraphName() {
+            return graphName;
+        }
+
+        public void setGraphName(String graphName) {
+            this.graphName = graphName;
+        }
+
+        public abstract void beginTx();
+
+        public abstract int getActiveStoreSize();
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
new file mode 100755
index 000000000..e2ddfd97c
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
@@ -0,0 +1,802 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hugegraph.backend.query.Query;
+import org.apache.hugegraph.backend.store.BackendEntry;
+import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn;
+import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
+import org.apache.hugegraph.backend.store.BackendEntryIterator;
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+import org.apache.hugegraph.pd.client.PDClient;
+import org.apache.hugegraph.pd.client.PDConfig;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.store.HgKvEntry;
+import org.apache.hugegraph.store.HgKvIterator;
+import org.apache.hugegraph.store.HgOwnerKey;
+import org.apache.hugegraph.store.HgScanQuery;
+import org.apache.hugegraph.store.HgStoreClient;
+import org.apache.hugegraph.store.HgStoreSession;
+import org.apache.hugegraph.store.client.grpc.KvCloseableIterator;
+import org.apache.hugegraph.store.client.util.HgStoreClientConst;
+import org.apache.hugegraph.store.grpc.common.ScanOrderType;
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.hugegraph.type.define.GraphMode;
+import org.apache.hugegraph.util.Bytes;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.StringEncoding;
+
+public class HstoreSessionsImpl extends HstoreSessions {
+
+    private static final Set<String> infoInitializedGraph =
+        Collections.synchronizedSet(new HashSet<>());
+    private static int tableCode = 0;
+    private static volatile Boolean initializedNode = Boolean.FALSE;
+    private static volatile PDClient defaultPdClient;
+    private static volatile HgStoreClient hgStoreClient;
+    private final HugeConfig config;
+    private final HstoreSession session;
+    private final Map<String, Integer> tables;
+    private final AtomicInteger refCount;
+    private final String graphName;
+
+    public HstoreSessionsImpl(HugeConfig config, String database,
+                              String store) {
+        super(config, database, store);
+        this.config = config;
+        this.graphName = database + "/" + store;
+        this.initStoreNode(config);
+        this.session = new HstoreSession(this.config, graphName);
+        this.tables = new ConcurrentHashMap<>();
+        this.refCount = new AtomicInteger(1);
+    }
+
+    public static HgStoreClient getHgStoreClient() {
+        return hgStoreClient;
+    }
+
+    public static PDClient getDefaultPdClient() {
+        return defaultPdClient;
+    }
+
+    public static byte[] encode(String string) {
+        return StringEncoding.encode(string);
+    }
+
+    public static String decode(byte[] bytes) {
+        return StringEncoding.decode(bytes);
+    }
+
+    private void initStoreNode(HugeConfig config) {
+        if (!initializedNode) {
+            synchronized (this) {
+                if (!initializedNode) {
+                    PDConfig pdConfig =
+                        PDConfig.of(config.get(CoreOptions.PD_PEERS))
+                                .setEnableCache(true);
+                    defaultPdClient = PDClient.create(pdConfig);
+                    hgStoreClient =
+                        HgStoreClient.create(defaultPdClient);
+                    initializedNode = Boolean.TRUE;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        if (!infoInitializedGraph.contains(this.graphName)) {
+            synchronized (infoInitializedGraph) {
+                if (!infoInitializedGraph.contains(this.graphName)) {
+                    Integer partitionCount =
+                        this.config.get(HstoreOptions.PARTITION_COUNT);
+                    Assert.assertTrue("The value of hstore.partition_count" +
+                                      " cannot be less than 0.",
+                                      partitionCount > -1);
+                    defaultPdClient.setGraph(Metapb.Graph.newBuilder()
+                                                         .setGraphName(
+                                                             this.graphName)
+                                                         .setPartitionCount(
+                                                             partitionCount)
+                                                         .build());
+                    infoInitializedGraph.add(this.graphName);
+                }
+            }
+        }
+        this.session.open();
+    }
+
+    @Override
+    protected boolean opened() {
+        return this.session != null;
+    }
+
+    @Override
+    public Set<String> openedTables() {
+        return this.tables.keySet();
+    }
+
+    @Override
+    public synchronized void createTable(String... tables) {
+        for (String table : tables) {
+            this.session.createTable(table);
+            this.tables.put(table, tableCode++);
+        }
+    }
+
+    @Override
+    public synchronized void dropTable(String... tables) {
+        for (String table : tables) {
+            this.session.dropTable(table);
+            this.tables.remove(table);
+        }
+    }
+
+    @Override
+    public boolean existsTable(String table) {
+        return this.session.existsTable(table);
+    }
+
+    @Override
+    public void truncateTable(String table) {
+        this.session.truncateTable(table);
+    }
+
+    @Override
+    public void clear() {
+        this.session.deleteGraph();
+        try {
+            hgStoreClient.getPdClient().delGraph(this.graphName);
+        } catch (PDException e) {
+
+        }
+    }
+
+    @Override
+    public final Session session() {
+        return (Session) super.getOrNewSession();
+    }
+
+    @Override
+    protected final Session newSession() {
+        return new HstoreSession(this.config(), this.graphName);
+    }
+
+    @Override
+    protected synchronized void doClose() {
+        this.checkValid();
+        if (this.refCount != null) {
+            if (this.refCount.decrementAndGet() > 0) {
+                return;
+            }
+            if (this.refCount.get() != 0) {
+                return;
+            }
+        }
+        assert this.refCount.get() == 0;
+        this.tables.clear();
+        this.session.close();
+    }
+
+    private void checkValid() {
+    }
+
+    private static class ColumnIterator<T extends HgKvIterator> implements
+                                                                
BackendColumnIterator,
+                                                                Countable {
+
+        private final T iter;
+        private final byte[] keyBegin;
+        private final byte[] keyEnd;
+        private final int scanType;
+        private final String table;
+        private final byte[] value;
+        private boolean gotNext;
+        private byte[] position;
+
+        public ColumnIterator(String table, T results) {
+            this(table, results, null, null, 0);
+        }
+
+        public ColumnIterator(String table, T results, byte[] keyBegin,
+                              byte[] keyEnd, int scanType) {
+            E.checkNotNull(results, "results");
+            this.table = table;
+            this.iter = results;
+            this.keyBegin = keyBegin;
+            this.keyEnd = keyEnd;
+            this.scanType = scanType;
+            this.value = null;
+            if (this.iter.hasNext()) {
+                this.iter.next();
+                this.gotNext = true;
+                this.position = iter.position();
+            } else {
+                this.gotNext = false;
+                // QUESTION: Resetting the position may result in the caller 
being unable to
+                //           retrieve the corresponding position.
+                this.position = null;
+            }
+            if (!ArrayUtils.isEmpty(this.keyBegin) ||
+                !ArrayUtils.isEmpty(this.keyEnd)) {
+                this.checkArguments();
+            }
+
+        }
+
+        public T iter() {
+            return iter;
+        }
+
+        private void checkArguments() {
+            E.checkArgument(!(this.match(Session.SCAN_PREFIX_BEGIN) &&
+                              this.match(Session.SCAN_PREFIX_END)),
+                            "Can't set SCAN_PREFIX_WITH_BEGIN and " +
+                            "SCAN_PREFIX_WITH_END at the same time");
+
+            E.checkArgument(!(this.match(Session.SCAN_PREFIX_BEGIN) &&
+                              this.match(Session.SCAN_GT_BEGIN)),
+                            "Can't set SCAN_PREFIX_WITH_BEGIN and " +
+                            "SCAN_GT_BEGIN/SCAN_GTE_BEGIN at the same time");
+
+            E.checkArgument(!(this.match(Session.SCAN_PREFIX_END) &&
+                              this.match(Session.SCAN_LT_END)),
+                            "Can't set SCAN_PREFIX_WITH_END and " +
+                            "SCAN_LT_END/SCAN_LTE_END at the same time");
+
+            if (this.match(Session.SCAN_PREFIX_BEGIN) && !matchHash()) {
+                E.checkArgument(this.keyBegin != null,
+                                "Parameter `keyBegin` can't be null " +
+                                "if set SCAN_PREFIX_WITH_BEGIN");
+                E.checkArgument(this.keyEnd == null,
+                                "Parameter `keyEnd` must be null " +
+                                "if set SCAN_PREFIX_WITH_BEGIN");
+            }
+
+            if (this.match(Session.SCAN_PREFIX_END) && !matchHash()) {
+                E.checkArgument(this.keyEnd != null,
+                                "Parameter `keyEnd` can't be null " +
+                                "if set SCAN_PREFIX_WITH_END");
+            }
+
+            if (this.match(Session.SCAN_GT_BEGIN) && !matchHash()) {
+                E.checkArgument(this.keyBegin != null,
+                                "Parameter `keyBegin` can't be null " +
+                                "if set SCAN_GT_BEGIN or SCAN_GTE_BEGIN");
+            }
+
+            if (this.match(Session.SCAN_LT_END) && !matchHash()) {
+                E.checkArgument(this.keyEnd != null,
+                                "Parameter `keyEnd` can't be null " +
+                                "if set SCAN_LT_END or SCAN_LTE_END");
+            }
+        }
+
+        private boolean matchHash() {
+            return this.scanType == Session.SCAN_HASHCODE;
+        }
+
+        private boolean match(int expected) {
+            return Session.matchScanType(expected, this.scanType);
+        }
+
+
+        @Override
+        public boolean hasNext() {
+            if (gotNext) {
+                this.position = this.iter.position();
+            } else {
+                // QUESTION: Resetting the position may result in the caller 
being unable to
+                //           retrieve the corresponding position.
+                this.position = null;
+            }
+            return gotNext;
+        }
+
+        private boolean filter(byte[] key) {
+            if (this.match(Session.SCAN_PREFIX_BEGIN)) {
+                /*
+                 * Prefix with `keyBegin`?
+                 * TODO: use custom prefix_extractor instead
+                 *       or use ReadOptions.prefix_same_as_start
+                 */
+                return Bytes.prefixWith(key, this.keyBegin);
+            } else if (this.match(Session.SCAN_PREFIX_END)) {
+                /*
+                 * Prefix with `keyEnd`?
+                 * like the following query for range index:
+                 *  key > 'age:20' and prefix with 'age'
+                 */
+                assert this.keyEnd != null;
+                return Bytes.prefixWith(key, this.keyEnd);
+            } else if (this.match(Session.SCAN_LT_END)) {
+                /*
+                 * Less (equal) than `keyEnd`?
+                 * NOTE: don't use BytewiseComparator due to signed byte
+                 */
+                if ((this.scanType | Session.SCAN_HASHCODE) != 0) {
+                    return true;
+                }
+                assert this.keyEnd != null;
+                if (this.match(Session.SCAN_LTE_END)) {
+                    // Just compare the prefix, can be there are excess tail
+                    key = Arrays.copyOfRange(key, 0, this.keyEnd.length);
+                    return Bytes.compare(key, this.keyEnd) <= 0;
+                } else {
+                    return Bytes.compare(key, this.keyEnd) < 0;
+                }
+            } else {
+                assert this.match(Session.SCAN_ANY) || 
this.match(Session.SCAN_GT_BEGIN) ||
+                       this.match(
+                           Session.SCAN_GTE_BEGIN) : "Unknown scan type";
+                return true;
+            }
+        }
+
+        @Override
+        public BackendColumn next() {
+            BackendEntryIterator.checkInterrupted();
+            if (!this.hasNext()) {
+                throw new NoSuchElementException();
+            }
+            BackendColumn col =
+                BackendColumn.of(this.iter.key(),
+                                 this.iter.value());
+            if (this.iter.hasNext()) {
+                gotNext = true;
+                this.iter.next();
+            } else {
+                gotNext = false;
+            }
+            return col;
+        }
+
+        @Override
+        public long count() {
+            long count = 0L;
+            while (this.hasNext()) {
+                this.next();
+                count++;
+                BackendEntryIterator.checkInterrupted();
+            }
+            return count;
+        }
+
+        @Override
+        public byte[] position() {
+            return this.position;
+        }
+
+        @Override
+        public void close() {
+            if (this.iter != null) {
+                this.iter.close();
+            }
+        }
+    }
+
+    /**
+     * HstoreSession implement for hstore
+     */
+    private final class HstoreSession extends Session {
+
+        private static final boolean TRANSACTIONAL = true;
+        private final HgStoreSession graph;
+        int changedSize = 0;
+
+        public HstoreSession(HugeConfig conf, String graphName) {
+            setGraphName(graphName);
+            setConf(conf);
+            this.graph = hgStoreClient.openSession(graphName);
+        }
+
+        @Override
+        public void open() {
+            this.opened = true;
+        }
+
+        @Override
+        public void close() {
+            this.opened = false;
+        }
+
+        @Override
+        public boolean closed() {
+            return !this.opened;
+        }
+
+        @Override
+        public void reset() {
+            if (this.changedSize != 0) {
+                this.rollback();
+                this.changedSize = 0;
+            }
+        }
+
+        /**
+         * Any change in the session
+         */
+        @Override
+        public boolean hasChanges() {
+            return this.changedSize > 0;
+        }
+
+        /**
+         * Commit all updates(put/delete) to DB
+         */
+        @Override
+        public Integer commit() {
+            if (!this.hasChanges()) {
+                // TODO: log a message with level WARNING
+                return 0;
+            }
+            int commitSize = this.changedSize;
+            if (TRANSACTIONAL) {
+                this.graph.commit();
+            }
+            this.changedSize = 0;
+            return commitSize;
+        }
+
+        /**
+         * Rollback all updates(put/delete) not committed
+         */
+        @Override
+        public void rollback() {
+            if (TRANSACTIONAL) {
+                this.graph.rollback();
+            }
+            this.changedSize = 0;
+        }
+
+        @Override
+        public void createTable(String tableName) {
+            this.graph.createTable(tableName);
+        }
+
+        @Override
+        public void dropTable(String tableName) {
+            this.graph.dropTable(tableName);
+        }
+
+        @Override
+        public boolean existsTable(String tableName) {
+            return this.graph.existsTable(tableName);
+        }
+
+        @Override
+        public void truncateTable(String tableName) {
+            this.graph.deleteTable(tableName);
+        }
+
+        @Override
+        public void deleteGraph() {
+            this.graph.deleteGraph(this.getGraphName());
+        }
+
+        @Override
+        public Pair<byte[], byte[]> keyRange(String table) {
+            return null;
+        }
+
+        private void prepare() {
+            if (!this.hasChanges() && TRANSACTIONAL) {
+                this.graph.beginTx();
+            }
+            this.changedSize++;
+        }
+
+        /**
+         * Add a KV record to a table
+         */
+        @Override
+        public void put(String table, byte[] ownerKey, byte[] key,
+                        byte[] value) {
+            prepare();
+            this.graph.put(table, HgOwnerKey.of(ownerKey, key), value);
+        }
+
+        @Override
+        public synchronized void increase(String table, byte[] ownerKey,
+                                          byte[] key, byte[] value) {
+            prepare();
+            this.graph.merge(table, HgOwnerKey.of(ownerKey, key), value);
+        }
+
+        @Override
+        public void delete(String table, byte[] ownerKey, byte[] key) {
+            prepare();
+            this.graph.delete(table, HgOwnerKey.of(ownerKey, key));
+        }
+
+        @Override
+        public void deletePrefix(String table, byte[] ownerKey, byte[] key) {
+            prepare();
+            this.graph.deletePrefix(table, HgOwnerKey.of(ownerKey, key));
+        }
+
+        /**
+         * Delete a range of keys from a table
+         */
+        @Override
+        public void deleteRange(String table, byte[] ownerKeyFrom,
+                                byte[] ownerKeyTo, byte[] keyFrom,
+                                byte[] keyTo) {
+            prepare();
+            this.graph.deleteRange(table, HgOwnerKey.of(ownerKeyFrom, keyFrom),
+                                   HgOwnerKey.of(ownerKeyTo, keyTo));
+        }
+
+        @Override
+        public byte[] get(String table, byte[] key) {
+            return this.graph.get(table, HgOwnerKey.of(
+                HgStoreClientConst.ALL_PARTITION_OWNER, key));
+        }
+
+        @Override
+        public byte[] get(String table, byte[] ownerKey, byte[] key) {
+            byte[] values = this.graph.get(table, HgOwnerKey.of(ownerKey, 
key));
+            return values != null ? values : new byte[0];
+        }
+
+        @Override
+        public void beginTx() {
+            this.graph.beginTx();
+        }
+
+        @Override
+        public BackendColumnIterator scan(String table) {
+            assert !this.hasChanges();
+            return new ColumnIterator<>(table, this.graph.scanIterator(table));
+        }
+
+        @Override
+        public BackendColumnIterator scan(String table,
+                                          byte[] conditionQueryToByte) {
+            assert !this.hasChanges();
+            HgKvIterator results =
+                this.graph.scanIterator(table, conditionQueryToByte);
+            return new ColumnIterator<>(table, results);
+        }
+
+        @Override
+        public BackendColumnIterator scan(String table, byte[] ownerKey,
+                                          byte[] prefix) {
+            assert !this.hasChanges();
+            HgKvIterator<HgKvEntry> result = this.graph.scanIterator(table,
+                                                                     
HgOwnerKey.of(
+                                                                         
ownerKey,
+                                                                         
prefix));
+            return new ColumnIterator<>(table, result);
+        }
+
+        @Override
+        public List<BackendColumnIterator> scan(String table,
+                                                List<HgOwnerKey> keys,
+                                                int scanType, long limit,
+                                                byte[] query) {
+            HgScanQuery scanQuery = HgScanQuery.prefixOf(table, keys).builder()
+                                               .setScanType(scanType)
+                                               .setQuery(query)
+                                               .setPerKeyLimit(limit).build();
+            List<HgKvIterator<HgKvEntry>> scanIterators =
+                this.graph.scanBatch(scanQuery);
+            LinkedList<BackendColumnIterator> columnIterators =
+                new LinkedList<>();
+            scanIterators.forEach(item -> {
+                columnIterators.add(
+                    new ColumnIterator<>(table, item));
+            });
+            return columnIterators;
+        }
+
+        @Override
+        public BackendEntry.BackendIterator<BackendColumnIterator> scan(
+            String table,
+            Iterator<HgOwnerKey> keys,
+            int scanType, Query queryParam, byte[] query) {
+            ScanOrderType orderType;
+            switch (queryParam.orderType()) {
+                case ORDER_NONE:
+                    orderType = ScanOrderType.ORDER_NONE;
+                    break;
+                case ORDER_WITHIN_VERTEX:
+                    orderType = ScanOrderType.ORDER_WITHIN_VERTEX;
+                    break;
+                case ORDER_STRICT:
+                    orderType = ScanOrderType.ORDER_STRICT;
+                    break;
+                default:
+                    throw new RuntimeException("not implement");
+            }
+            HgScanQuery scanQuery = HgScanQuery.prefixIteratorOf(table, keys)
+                                               .builder()
+                                               .setScanType(scanType)
+                                               .setQuery(query)
+                                               
.setPerKeyMax(queryParam.limit())
+                                               .setOrderType(orderType)
+                                               .setOnlyKey(
+                                                       
!queryParam.withProperties())
+                                               .setSkipDegree(
+                                                       queryParam.skipDegree())
+                                               .build();
+            KvCloseableIterator<HgKvIterator<HgKvEntry>> scanIterators =
+                    this.graph.scanBatch2(scanQuery);
+            return new BackendEntry.BackendIterator<BackendColumnIterator>() {
+                @Override
+                public void close() {
+                    scanIterators.close();
+                }
+
+                @Override
+                public byte[] position() {
+                    throw new NotImplementedException();
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return scanIterators.hasNext();
+                }
+
+                @Override
+                public BackendColumnIterator next() {
+                    return new ColumnIterator<HgKvIterator>(table,
+                                                            
scanIterators.next());
+                }
+            };
+        }
+
+        @Override
+        public BackendColumnIterator scan(String table, byte[] ownerKeyFrom,
+                                          byte[] ownerKeyTo,
+                                          byte[] keyFrom, byte[] keyTo,
+                                          int scanType) {
+            assert !this.hasChanges();
+            HgKvIterator result = this.graph.scanIterator(table, HgOwnerKey.of(
+                                                              ownerKeyFrom, 
keyFrom),
+                                                          HgOwnerKey.of(
+                                                              ownerKeyTo,
+                                                              keyTo), 0,
+                                                          scanType,
+                                                          null);
+            return new ColumnIterator<>(table, result, keyFrom,
+                                        keyTo, scanType);
+        }
+
+        @Override
+        public BackendColumnIterator scan(String table, byte[] ownerKeyFrom,
+                                          byte[] ownerKeyTo,
+                                          byte[] keyFrom, byte[] keyTo,
+                                          int scanType, byte[] query) {
+            assert !this.hasChanges();
+            HgKvIterator<HgKvEntry> result = this.graph.scanIterator(table,
+                                                                     
HgOwnerKey.of(
+                                                                         
ownerKeyFrom,
+                                                                         
keyFrom),
+                                                                     
HgOwnerKey.of(
+                                                                         
ownerKeyTo,
+                                                                         
keyTo),
+                                                                     0,
+                                                                     scanType,
+                                                                     query);
+            return new ColumnIterator<>(table, result, keyFrom, keyTo,
+                                        scanType);
+        }
+
+        @Override
+        public BackendColumnIterator scan(String table, byte[] ownerKeyFrom,
+                                          byte[] ownerKeyTo,
+                                          byte[] keyFrom, byte[] keyTo,
+                                          int scanType, byte[] query,
+                                          byte[] position) {
+            assert !this.hasChanges();
+            HgKvIterator<HgKvEntry> result = this.graph.scanIterator(table,
+                                                                     
HgOwnerKey.of(
+                                                                         
ownerKeyFrom,
+                                                                         
keyFrom),
+                                                                     
HgOwnerKey.of(
+                                                                         
ownerKeyTo,
+                                                                         
keyTo),
+                                                                     0,
+                                                                     scanType,
+                                                                     query);
+            result.seek(position);
+            return new ColumnIterator<>(table, result, keyFrom, keyTo,
+                                        scanType);
+        }
+
+        @Override
+        public BackendColumnIterator scan(String table, int codeFrom,
+                                          int codeTo, int scanType,
+                                          byte[] query) {
+            assert !this.hasChanges();
+            HgKvIterator<HgKvEntry> iterator =
+                this.graph.scanIterator(table, codeFrom, codeTo, 256,
+                                        new byte[0]);
+            return new ColumnIterator<>(table, iterator, new byte[0],
+                                        new byte[0], scanType);
+        }
+
+        @Override
+        public BackendColumnIterator scan(String table, int codeFrom,
+                                          int codeTo, int scanType,
+                                          byte[] query, byte[] position) {
+            assert !this.hasChanges();
+            HgKvIterator<HgKvEntry> iterator =
+                this.graph.scanIterator(table, codeFrom, codeTo, 256,
+                                        new byte[0]);
+            iterator.seek(position);
+            return new ColumnIterator<>(table, iterator, new byte[0],
+                                        new byte[0], scanType);
+        }
+
+        @Override
+        public BackendColumnIterator getWithBatch(String table,
+                                                  List<HgOwnerKey> keys) {
+            assert !this.hasChanges();
+            HgKvIterator<HgKvEntry> kvIterator =
+                this.graph.batchPrefix(table, keys);
+            return new ColumnIterator<>(table, kvIterator);
+        }
+
+        @Override
+        public void merge(String table, byte[] ownerKey, byte[] key,
+                          byte[] value) {
+            prepare();
+            this.graph.merge(table, HgOwnerKey.of(ownerKey, key), value);
+        }
+
+        @Override
+        public void setMode(GraphMode mode) {
+            // no need to set pd mode
+        }
+
+        @Override
+        public void truncate() throws Exception {
+            this.graph.truncate();
+            HstoreSessionsImpl.getDefaultPdClient()
+                              .resetIdByKey(this.getGraphName());
+        }
+
+        @Override
+        public int getActiveStoreSize() {
+            try {
+                return defaultPdClient.getActiveStores().size();
+            } catch (PDException ignore) {
+            }
+            return 0;
+        }
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreStore.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreStore.java
new file mode 100644
index 000000000..1127d122e
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreStore.java
@@ -0,0 +1,825 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.id.IdGenerator;
+import org.apache.hugegraph.backend.query.IdPrefixQuery;
+import org.apache.hugegraph.backend.query.IdQuery;
+import org.apache.hugegraph.backend.query.Query;
+import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
+import org.apache.hugegraph.backend.serializer.BytesBuffer;
+import org.apache.hugegraph.backend.serializer.MergeIterator;
+import org.apache.hugegraph.backend.store.AbstractBackendStore;
+import org.apache.hugegraph.backend.store.BackendAction;
+import org.apache.hugegraph.backend.store.BackendEntry;
+import org.apache.hugegraph.backend.store.BackendFeatures;
+import org.apache.hugegraph.backend.store.BackendMutation;
+import org.apache.hugegraph.backend.store.BackendStoreProvider;
+import org.apache.hugegraph.backend.store.BackendTable;
+import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Session;
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+import org.apache.hugegraph.iterator.CIter;
+import org.apache.hugegraph.type.HugeTableType;
+import org.apache.hugegraph.type.HugeType;
+import org.apache.hugegraph.type.define.Action;
+import org.apache.hugegraph.type.define.GraphMode;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+public abstract class HstoreStore extends AbstractBackendStore<Session> {
+
+    private static final Logger LOG = Log.logger(HstoreStore.class);
+
+    private static final Set<HugeType> INDEX_TYPES = ImmutableSet.of(
+        HugeType.SECONDARY_INDEX, HugeType.VERTEX_LABEL_INDEX,
+        HugeType.EDGE_LABEL_INDEX, HugeType.RANGE_INT_INDEX,
+        HugeType.RANGE_FLOAT_INDEX, HugeType.RANGE_LONG_INDEX,
+        HugeType.RANGE_DOUBLE_INDEX, HugeType.SEARCH_INDEX,
+        HugeType.SHARD_INDEX, HugeType.UNIQUE_INDEX
+    );
+
+    private static final BackendFeatures FEATURES = new HstoreFeatures();
+    private final String store, namespace;
+
+    private final BackendStoreProvider provider;
+    private final Map<Integer, HstoreTable> tables;
+    private final ReadWriteLock storeLock;
+    private boolean isGraphStore;
+    private HstoreSessions sessions;
+
+    public HstoreStore(final BackendStoreProvider provider,
+                       String namespace, String store) {
+        this.tables = new HashMap<>();
+        this.provider = provider;
+        this.namespace = namespace;
+        this.store = store;
+        this.sessions = null;
+        this.storeLock = new ReentrantReadWriteLock();
+        this.registerMetaHandlers();
+        LOG.debug("Store loaded: {}", store);
+    }
+
+    private void registerMetaHandlers() {
+        Supplier<List<HstoreSessions>> dbsGet = () -> {
+            List<HstoreSessions> dbs = new ArrayList<>();
+            dbs.add(this.sessions);
+            return dbs;
+        };
+        this.registerMetaHandler("metrics", (session, meta, args) -> {
+            HstoreMetrics metrics = new HstoreMetrics(dbsGet.get(), session);
+            return metrics.metrics();
+        });
+        this.registerMetaHandler("mode", (session, meta, args) -> {
+            E.checkArgument(args.length == 1,
+                            "The args count of %s must be 1", meta);
+            session.setMode((GraphMode) args[0]);
+            return null;
+        });
+    }
+
+    protected void registerTableManager(HugeTableType type, HstoreTable table) 
{
+        this.tables.put((int) type.code(), table);
+    }
+
+    @Override
+    protected final HstoreTable table(HugeType type) {
+        assert type != null;
+        HugeTableType table;
+        switch (type) {
+            case VERTEX:
+                table = HugeTableType.VERTEX;
+                break;
+            case EDGE_OUT:
+                table = HugeTableType.OUT_EDGE;
+                break;
+            case EDGE_IN:
+                table = HugeTableType.IN_EDGE;
+                break;
+            case OLAP:
+                table = HugeTableType.OLAP_TABLE;
+                break;
+            case TASK:
+                table = HugeTableType.TASK_INFO_TABLE;
+                break;
+            case SERVER:
+                table = HugeTableType.SERVER_INFO_TABLE;
+                break;
+            case SEARCH_INDEX:
+            case SHARD_INDEX:
+            case SECONDARY_INDEX:
+            case RANGE_INT_INDEX:
+            case RANGE_LONG_INDEX:
+            case RANGE_FLOAT_INDEX:
+            case RANGE_DOUBLE_INDEX:
+            case EDGE_LABEL_INDEX:
+            case VERTEX_LABEL_INDEX:
+            case UNIQUE_INDEX:
+                table = HugeTableType.ALL_INDEX_TABLE;
+                break;
+            default:
+                throw new AssertionError(String.format(
+                    "Invalid type: %s", type));
+        }
+        return this.tables.get((int) table.code());
+    }
+
+    protected List<String> tableNames() {
+        return this.tables.values().stream()
+                          .map(BackendTable::table)
+                          .collect(Collectors.toList());
+    }
+
+    @Override
+    protected Session session(HugeType type) {
+        this.checkOpened();
+        return this.sessions.session();
+    }
+
+    public String namespace() {
+        return this.namespace;
+    }
+
+    @Override
+    public String store() {
+        return this.store;
+    }
+
+    @Override
+    public String database() {
+        return this.namespace;
+    }
+
+    @Override
+    public BackendStoreProvider provider() {
+        return this.provider;
+    }
+
+    @Override
+    public BackendFeatures features() {
+        return FEATURES;
+    }
+
+    @Override
+    public synchronized void open(HugeConfig config) {
+        E.checkNotNull(config, "config");
+
+        if (this.sessions == null) {
+            this.sessions = new HstoreSessionsImpl(config, this.namespace,
+                                                   this.store);
+        }
+
+        String graphStore = config.get(CoreOptions.STORE_GRAPH);
+        this.isGraphStore = this.store.equals(graphStore);
+        assert this.sessions != null;
+        if (!this.sessions.closed()) {
+            LOG.debug("Store {} has been opened before", this.store);
+            this.sessions.useSession();
+            return;
+        }
+
+        try {
+            // NOTE: won't throw error even if connection refused
+            this.sessions.open();
+        } catch (Exception e) {
+            LOG.error("Failed to open Hstore '{}':{}", this.store, e);
+        }
+        this.sessions.session();
+        LOG.debug("Store opened: {}", this.store);
+    }
+
+    @Override
+    public void close() {
+        this.checkOpened();
+        this.sessions.close();
+
+        LOG.debug("Store closed: {}", this.store);
+    }
+
+    @Override
+    public boolean opened() {
+        this.checkConnectionOpened();
+        return this.sessions.session().opened();
+    }
+
+    @Override
+    public void mutate(BackendMutation mutation) {
+        Session session = this.sessions.session();
+        assert session.opened();
+        Map<HugeType, Map<Id, List<BackendAction>>> mutations = 
mutation.mutations();
+        Set<Map.Entry<HugeType, Map<Id, List<BackendAction>>>> entries = 
mutations.entrySet();
+        for (Map.Entry<HugeType, Map<Id, List<BackendAction>>> entry : 
entries) {
+            HugeType key = entry.getKey();
+            // in order to obtain the owner efficiently, special for edge
+            boolean isEdge = key.isEdge();
+            HstoreTable hTable = this.table(key);
+            Map<Id, List<BackendAction>> table = entry.getValue();
+            Collection<List<BackendAction>> values = table.values();
+            for (List<BackendAction> items : values) {
+                for (int i = 0; i < items.size(); i++) {
+                    BackendAction item = items.get(i);
+                    // set to ArrayList, use index to get item
+                    this.mutate(session, item, hTable, isEdge);
+                }
+            }
+        }
+    }
+
+    private void mutate(Session session, BackendAction item,
+                        HstoreTable hTable, boolean isEdge) {
+        BackendEntry entry = item.entry();
+        HstoreTable table;
+        if (!entry.olap()) {
+            // Oltp table
+            table = hTable;
+        } else {
+            if (entry.type().isIndex()) {
+                // Olap index
+                table = this.table(entry.type());
+            } else {
+                // Olap vertex
+                table = this.table(HugeType.OLAP);
+            }
+            session = this.session(HugeType.OLAP);
+        }
+
+        if (item.action().code() == Action.INSERT.code()) {
+            table.insert(session, entry, isEdge);
+        } else {
+            if (item.action().code() == Action.APPEND.code()) {
+                table.append(session, entry);
+            } else {
+                switch (item.action()) {
+                    case DELETE:
+                        table.delete(session, entry);
+                        break;
+                    case ELIMINATE:
+                        table.eliminate(session, entry);
+                        break;
+                    case UPDATE_IF_PRESENT:
+                        table.updateIfPresent(session, entry);
+                        break;
+                    case UPDATE_IF_ABSENT:
+                        table.updateIfAbsent(session, entry);
+                        break;
+                    default:
+                        throw new AssertionError(String.format(
+                            "Unsupported mutate action: %s",
+                            item.action()));
+                }
+            }
+        }
+    }
+
+    private HstoreTable getTableByQuery(Query query) {
+        HugeType tableType = HstoreTable.tableType(query);
+        HstoreTable table;
+        if (query.olap()) {
+            if (query.resultType().isIndex()) {
+                // Any index type is ok here
+                table = this.table(HugeType.SECONDARY_INDEX);
+            } else {
+                table = this.table(HugeType.OLAP);
+            }
+        } else {
+            table = this.table(tableType);
+        }
+        return table;
+    }
+
+    @Override
+    public Iterator<BackendEntry> query(Query query) {
+        Lock readLock = this.storeLock.readLock();
+        readLock.lock();
+        try {
+            this.checkOpened();
+            Session session = this.sessions.session();
+            HstoreTable table = getTableByQuery(query);
+            Iterator<BackendEntry> entries = table.query(session, query);
+            // Merge olap results as needed
+            entries = getBackendEntryIterator(entries, query);
+            return entries;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    // TODO: uncomment later - sub edge labels
+    //@Override
+    //public Iterator<Iterator<BackendEntry>> query(Iterator<Query> queries,
+    //                                              Function<Query, Query> 
queryWriter,
+    //                                              HugeGraph hugeGraph) {
+    //    if (queries == null || !queries.hasNext()) {
+    //        return Collections.emptyIterator();
+    //    }
+    //
+    //    class QueryWrapper implements Iterator<IdPrefixQuery> {
+    //        Query first;
+    //        final Iterator<Query> queries;
+    //        Iterator<Id> subEls;
+    //        Query preQuery;
+    //        Iterator<IdPrefixQuery> queryListIterator;
+    //
+    //        QueryWrapper(Iterator<Query> queries, Query first) {
+    //            this.queries = queries;
+    //            this.first = first;
+    //        }
+    //
+    //        @Override
+    //        public boolean hasNext() {
+    //            return first != null || (this.subEls != null && 
this.subEls.hasNext())
+    //                   || (queryListIterator != null && 
queryListIterator.hasNext()) ||
+    //                   queries.hasNext();
+    //        }
+    //
+    //        @Override
+    //        public IdPrefixQuery next() {
+    //            if (queryListIterator != null && 
queryListIterator.hasNext()) {
+    //                return queryListIterator.next();
+    //            }
+    //
+    //            Query q;
+    //            if (first != null) {
+    //                q = first;
+    //                preQuery = q.copy();
+    //                first = null;
+    //            } else {
+    //                if (this.subEls == null || !this.subEls.hasNext()) {
+    //                    q = queries.next();
+    //                    preQuery = q.copy();
+    //                } else {
+    //                    q = preQuery.copy();
+    //                }
+    //            }
+    //
+    //            assert q instanceof ConditionQuery;
+    //            ConditionQuery cq = (ConditionQuery) q;
+    //            ConditionQuery originQuery = (ConditionQuery) q.copy();
+    //
+    //            List<IdPrefixQuery> queryList = Lists.newArrayList();
+    //            if (hugeGraph != null) {
+    //                for (ConditionQuery conditionQuery :
+    //                    ConditionQueryFlatten.flatten(cq)) {
+    //                    Id label = conditionQuery.condition(HugeKeys.LABEL);
+    //                 /* 父类型 + sortKeys: g.V("V.id").outE("parentLabel").has
+    //                 ("sortKey","value")转成 所有子类型 + sortKeys*/
+    //                    if ((this.subEls == null ||
+    //                         !this.subEls.hasNext()) && label != null &&
+    //                        hugeGraph.edgeLabel(label).isFather() &&
+    //                        conditionQuery.condition(HugeKeys.SUB_LABEL) ==
+    //                        null &&
+    //                        conditionQuery.condition(HugeKeys.OWNER_VERTEX) 
!=
+    //                        null &&
+    //                        conditionQuery.condition(HugeKeys.DIRECTION) !=
+    //                        null &&
+    //                        matchEdgeSortKeys(conditionQuery, false,
+    //                                          hugeGraph)) {
+    //                        this.subEls =
+    //                            getSubLabelsOfParentEl(
+    //                                hugeGraph.edgeLabels(),
+    //                                label);
+    //                    }
+    //
+    //                    if (this.subEls != null &&
+    //                        this.subEls.hasNext()) {
+    //                        conditionQuery.eq(HugeKeys.SUB_LABEL,
+    //                                          subEls.next());
+    //                    }
+    //
+    //                    HugeType hugeType = conditionQuery.resultType();
+    //                    if (hugeType != null && hugeType.isEdge() &&
+    //                        !conditionQuery.conditions().isEmpty()) {
+    //                        IdPrefixQuery idPrefixQuery =
+    //                            (IdPrefixQuery) queryWriter.apply(
+    //                                conditionQuery);
+    //                        idPrefixQuery.setOriginQuery(originQuery);
+    //                        queryList.add(idPrefixQuery);
+    //                    }
+    //                }
+    //
+    //                queryListIterator = queryList.iterator();
+    //                if (queryListIterator.hasNext()) {
+    //                    return queryListIterator.next();
+    //                }
+    //            }
+    //
+    //            Id ownerId = cq.condition(HugeKeys.OWNER_VERTEX);
+    //            assert ownerId != null;
+    //            BytesBuffer buffer =
+    //                BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
+    //            buffer.writeId(ownerId);
+    //            return new IdPrefixQuery(cq, new BinaryBackendEntry.BinaryId(
+    //                buffer.bytes(), ownerId));
+    //        }
+    //
+    //        private boolean matchEdgeSortKeys(ConditionQuery query,
+    //                                          boolean matchAll,
+    //                                          HugeGraph graph) {
+    //            assert query.resultType().isEdge();
+    //            Id label = query.condition(HugeKeys.LABEL);
+    //            if (label == null) {
+    //                return false;
+    //            }
+    //            List<Id> sortKeys = graph.edgeLabel(label).sortKeys();
+    //            if (sortKeys.isEmpty()) {
+    //                return false;
+    //            }
+    //            Set<Id> queryKeys = query.userpropKeys();
+    //            for (int i = sortKeys.size(); i > 0; i--) {
+    //                List<Id> subFields = sortKeys.subList(0, i);
+    //                if (queryKeys.containsAll(subFields)) {
+    //                    if (queryKeys.size() == subFields.size() || 
!matchAll) {
+    //                        /*
+    //                         * Return true if:
+    //                         * matchAll=true and all queryKeys are in 
sortKeys
+    //                         *  or
+    //                         * partial queryKeys are in sortKeys
+    //                         */
+    //                        return true;
+    //                    }
+    //                }
+    //            }
+    //            return false;
+    //        }
+    //    }
+    //    Query first = queries.next();
+    //    List<HugeType> typeList = getHugeTypes(first);
+    //    QueryWrapper idPrefixQueries = new QueryWrapper(queries, first);
+    //
+    //    return query(typeList, idPrefixQueries);
+    //}
+
+    //private Iterator<Id> getSubLabelsOfParentEl(Collection<EdgeLabel> allEls,
+    //                                            Id label) {
+    //    List<Id> list = new ArrayList<>();
+    //    for (EdgeLabel el : allEls) {
+    //        if (el.edgeLabelType().sub() && el.fatherId().equals(label)) {
+    //            list.add(el.id());
+    //        }
+    //    }
+    //    return list.iterator();
+    //}
+
+    public List<CIter<BackendEntry>> query(List<HugeType> typeList,
+                                           List<IdPrefixQuery> queries) {
+        Lock readLock = this.storeLock.readLock();
+        readLock.lock();
+        LinkedList<CIter<BackendEntry>> results = new LinkedList<>();
+        try {
+            this.checkOpened();
+            Session session = this.sessions.session();
+            E.checkState(!CollectionUtils.isEmpty(queries) &&
+                         !CollectionUtils.isEmpty(typeList),
+                         "Please check query list or type list.");
+            HstoreTable table = null;
+            StringBuilder builder = new StringBuilder();
+            for (HugeType type : typeList) {
+                builder.append((table = this.table(type)).table()).append(",");
+            }
+            List<Iterator<BackendEntry>> iteratorList =
+                table.query(session, queries,
+                            builder.substring(0, builder.length() - 1));
+            for (int i = 0; i < iteratorList.size(); i++) {
+                Iterator<BackendEntry> entries = iteratorList.get(i);
+                // Merge olap results as needed
+                Query query = queries.get(i);
+                entries = getBackendEntryIterator(entries, query);
+                if (entries instanceof CIter) {
+                    results.add((CIter) entries);
+                }
+            }
+            return results;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public Iterator<Iterator<BackendEntry>> query(List<HugeType> typeList,
+                                                  Iterator<IdPrefixQuery> 
queries) {
+        Lock readLock = this.storeLock.readLock();
+        readLock.lock();
+        try {
+            this.checkOpened();
+            Session session = this.sessions.session();
+            E.checkState(queries.hasNext() &&
+                         !CollectionUtils.isEmpty(typeList),
+                         "Please check query list or type list.");
+            HstoreTable table = null;
+            StringBuilder builder = new StringBuilder();
+            for (HugeType type : typeList) {
+                builder.append((table = this.table(type)).table()).append(",");
+            }
+
+            Iterator<Iterator<BackendEntry>> iterators =
+                table.query(session, queries,
+                            builder.substring(0, builder.length() - 1));
+
+            return iterators;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    private Iterator<BackendEntry> getBackendEntryIterator(
+        Iterator<BackendEntry> entries,
+        Query query) {
+        HstoreTable table;
+        Set<Id> olapPks = query.olapPks();
+        if (this.isGraphStore && !olapPks.isEmpty()) {
+            List<Iterator<BackendEntry>> iterators = new ArrayList<>();
+            for (Id pk : olapPks) {
+                // 构造olap表查询query condition
+                Query q = this.constructOlapQueryCondition(pk, query);
+                table = this.table(HugeType.OLAP);
+                iterators.add(table.queryOlap(this.session(HugeType.OLAP), q));
+            }
+            entries = new MergeIterator<>(entries, iterators,
+                                          BackendEntry::mergeable);
+        }
+        return entries;
+    }
+
+
+    /**
+     * 重新构造 查询olap表 query
+     * 由于 olap合并成一张表, 在写入olap数据, key在后面增加了pk
+     * 所以在此进行查询的时候,需要重新构造pk前缀
+     * 写入参考 BinarySerializer.writeOlapVertex
+     *
+     * @param pk
+     * @param query
+     * @return
+     */
+    private Query constructOlapQueryCondition(Id pk, Query query) {
+        if (query instanceof IdQuery && 
!CollectionUtils.isEmpty((query).ids())) {
+            IdQuery q = (IdQuery) query.copy();
+            Iterator<Id> iterator = q.ids().iterator();
+            LinkedHashSet<Id> linkedHashSet = new LinkedHashSet<>();
+            while (iterator.hasNext()) {
+                Id id = iterator.next();
+                if (id instanceof BinaryBackendEntry.BinaryId) {
+                    id = ((BinaryBackendEntry.BinaryId) id).origin();
+                }
+
+                // create binary id
+                BytesBuffer buffer =
+                    BytesBuffer.allocate(1 + pk.length() + 1 + id.length());
+                buffer.writeId(pk);
+                id = new BinaryBackendEntry.BinaryId(
+                    buffer.writeId(id).bytes(), id);
+                linkedHashSet.add(id);
+            }
+            q.resetIds();
+            q.query(linkedHashSet);
+            return q;
+        } else {
+            // create binary id
+            BytesBuffer buffer = BytesBuffer.allocate(1 + pk.length());
+            pk = new BinaryBackendEntry.BinaryId(
+                buffer.writeId(pk).bytes(), pk);
+
+            IdPrefixQuery idPrefixQuery = new IdPrefixQuery(HugeType.OLAP, pk);
+            return idPrefixQuery;
+        }
+    }
+
+    @Override
+    public Number queryNumber(Query query) {
+        this.checkOpened();
+
+        Session session = this.sessions.session();
+        HstoreTable table = this.table(HstoreTable.tableType(query));
+        return table.queryNumber(session, query);
+    }
+
+    @Override
+    public synchronized void init() {
+        Lock writeLock = this.storeLock.writeLock();
+        writeLock.lock();
+        try {
+            // Create tables with main disk
+            this.sessions.createTable(this.tableNames().toArray(new 
String[0]));
+            LOG.debug("Store initialized: {}", this.store);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void clear(boolean clearSpace) {
+        Lock writeLock = this.storeLock.writeLock();
+        writeLock.lock();
+        try {
+            // Drop tables with main disk
+            this.sessions.dropTable(this.tableNames().toArray(new String[0]));
+            if (clearSpace) {
+                this.sessions.clear();
+            }
+            LOG.debug("Store cleared: {}", this.store);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean initialized() {
+        return true;
+    }
+
+    @Override
+    public void truncate() {
+        try {
+            this.sessions.session().truncate();
+        } catch (Exception e) {
+            LOG.error("Store truncated failed", e);
+            return;
+        }
+        LOG.debug("Store truncated: {}", this.store);
+    }
+
+    @Override
+    public void beginTx() {
+        this.sessions.session().beginTx();
+    }
+
+    @Override
+    public void commitTx() {
+        this.checkOpened();
+        Session session = this.sessions.session();
+        session.commit();
+    }
+
+    @Override
+    public void rollbackTx() {
+        this.checkOpened();
+        Session session = this.sessions.session();
+        session.rollback();
+    }
+
+    private void checkConnectionOpened() {
+    }
+
+    @Override
+    public Id nextId(HugeType type) {
+        long counter = 0L;
+        counter = this.getCounter(type);
+        E.checkState(counter != 0L, "Please check whether '%s' is OK",
+                     this.provider().type());
+        return IdGenerator.of(counter);
+    }
+
+    @Override
+    public void setCounterLowest(HugeType type, long lowest) {
+        this.increaseCounter(type, lowest);
+    }
+
+    /***************************** Store defines *****************************/
+
+    public static class HstoreSchemaStore extends HstoreStore {
+
+        public HstoreSchemaStore(BackendStoreProvider provider, String 
namespace, String store) {
+            super(provider, namespace, store);
+        }
+
+        @Override
+        public boolean isSchemaStore() {
+            return true;
+        }
+
+        @Override
+        public void increaseCounter(HugeType type, long num) {
+            throw new UnsupportedOperationException(
+                "HstoreSchemaStore.increaseCounter()");
+        }
+
+        @Override
+        public long getCounter(HugeType type) {
+            throw new UnsupportedOperationException(
+                "HstoreSchemaStore.getCounter()");
+        }
+    }
+
+    public static class HstoreGraphStore extends HstoreStore {
+
+        public HstoreGraphStore(BackendStoreProvider provider,
+                                String namespace, String store) {
+            super(provider, namespace, store);
+
+            registerTableManager(HugeTableType.VERTEX,
+                                 new HstoreTables.Vertex(store));
+            registerTableManager(HugeTableType.OUT_EDGE,
+                                 HstoreTables.Edge.out(store));
+            registerTableManager(HugeTableType.IN_EDGE,
+                                 HstoreTables.Edge.in(store));
+            registerTableManager(HugeTableType.ALL_INDEX_TABLE,
+                                 new HstoreTables.IndexTable(store));
+            registerTableManager(HugeTableType.OLAP_TABLE,
+                                 new HstoreTables.OlapTable(store));
+            registerTableManager(HugeTableType.TASK_INFO_TABLE,
+                                 new HstoreTables.TaskInfo(store));
+            registerTableManager(HugeTableType.SERVER_INFO_TABLE,
+                                 new HstoreTables.ServerInfo(store));
+        }
+
+        @Override
+        public boolean isSchemaStore() {
+            return false;
+        }
+
+        @Override
+        public Id nextId(HugeType type) {
+            throw new UnsupportedOperationException(
+                "HstoreGraphStore.nextId()");
+        }
+
+        @Override
+        public void increaseCounter(HugeType type, long num) {
+            throw new UnsupportedOperationException(
+                "HstoreGraphStore.increaseCounter()");
+        }
+
+        @Override
+        public long getCounter(HugeType type) {
+            throw new UnsupportedOperationException(
+                "HstoreGraphStore.getCounter()");
+        }
+
+        @Override
+        public void createOlapTable(Id pkId) {
+            HstoreTable table = new HstoreTables.OlapTable(this.store());
+            LOG.info("Hstore create olap table {}", table.table());
+            super.sessions.createTable(table.table());
+            LOG.info("Hstore finish create olap table");
+            registerTableManager(HugeTableType.OLAP_TABLE, table);
+            LOG.info("OLAP table {} has been created", table.table());
+        }
+
+        @Override
+        public void checkAndRegisterOlapTable(Id pkId) {
+            HstoreTable table = new HstoreTables.OlapTable(this.store());
+            if (!super.sessions.existsTable(table.table())) {
+                LOG.error("Found exception: Table '{}' doesn't exist, we'll " +
+                          "recreate it now. Please carefully check the recent" 
+
+                          "operation in server and computer, then ensure the " 
+
+                          "integrity of store file.", table.table());
+                this.createOlapTable(pkId);
+            } else {
+                registerTableManager(HugeTableType.OLAP_TABLE, table);
+            }
+        }
+
+        @Override
+        public void clearOlapTable(Id pkId) {
+        }
+
+        @Override
+        public void removeOlapTable(Id pkId) {
+        }
+
+        @Override
+        public boolean existOlapTable(Id pkId) {
+            String tableName = this.olapTableName(pkId);
+            return super.sessions.existsTable(tableName);
+        }
+    }
+
+    @Override
+    public String storedVersion() {
+        return "1.13";
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java
new file mode 100755
index 000000000..39e24a1d9
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.backend.id.EdgeId;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.page.PageState;
+import org.apache.hugegraph.backend.query.Aggregate;
+import org.apache.hugegraph.backend.query.Aggregate.AggregateFunc;
+import org.apache.hugegraph.backend.query.Condition;
+import org.apache.hugegraph.backend.query.Condition.Relation;
+import org.apache.hugegraph.backend.query.ConditionQuery;
+import org.apache.hugegraph.backend.query.IdPrefixQuery;
+import org.apache.hugegraph.backend.query.IdRangeQuery;
+import org.apache.hugegraph.backend.query.Query;
+import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
+import org.apache.hugegraph.backend.serializer.BinaryEntryIterator;
+import org.apache.hugegraph.backend.store.BackendEntry;
+import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn;
+import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
+import org.apache.hugegraph.backend.store.BackendEntryIterator;
+import org.apache.hugegraph.backend.store.BackendTable;
+import org.apache.hugegraph.backend.store.Shard;
+import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Countable;
+import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Session;
+import org.apache.hugegraph.exception.NotSupportException;
+import org.apache.hugegraph.pd.client.PDClient;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.store.HgOwnerKey;
+import org.apache.hugegraph.store.client.util.HgStoreClientConst;
+import org.apache.hugegraph.type.HugeType;
+import org.apache.hugegraph.type.define.HugeKeys;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.Log;
+import org.apache.hugegraph.util.StringEncoding;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.slf4j.Logger;
+
+public class HstoreTable extends BackendTable<Session, BackendEntry> {
+
+    private static final Logger LOG = Log.logger(HstoreStore.class);
+
+    private final HstoreShardSplitter shardSpliter;
+    Function<BackendEntry, byte[]> ownerDelegate = (entry) -> getOwner(entry);
+    Function<Id, byte[]> ownerByIdDelegate = (id) -> getOwnerId(id);
+    BiFunction<HugeType, Id, byte[]> ownerByQueryDelegate =
+        (type, id) -> getOwnerId(type, id);
+    Supplier<byte[]> ownerScanDelegate =
+        () -> HgStoreClientConst.ALL_PARTITION_OWNER;
+
+    public HstoreTable(String database, String table) {
+        super(String.format("%s+%s", database, table));
+        this.shardSpliter = new HstoreShardSplitter(this.table());
+    }
+
+    public static ConditionQuery removeDirectionCondition(ConditionQuery 
conditionQuery) {
+        Collection<Condition> conditions = conditionQuery.conditions();
+        List<Condition> newConditions = new ArrayList<>();
+        for (Condition condition : conditions) {
+            if (!direction(condition)) {
+                newConditions.add(condition);
+            }
+        }
+        if (newConditions.size() > 0) {
+            conditionQuery.resetConditions(newConditions);
+            return conditionQuery;
+        } else {
+            return null;
+        }
+    }
+
+    private static boolean direction(Condition condition) {
+        boolean direction = true;
+        List<? extends Relation> relations = condition.relations();
+        for (Relation r : relations) {
+            if (!r.key().equals(HugeKeys.DIRECTION)) {
+                direction = false;
+                break;
+            }
+        }
+        return direction;
+    }
+
+    protected static BackendEntryIterator newEntryIterator(
+        BackendColumnIterator cols, Query query) {
+        return new BinaryEntryIterator<>(cols, query, (entry, col) -> {
+            if (entry == null || !entry.belongToMe(col)) {
+                HugeType type = query.resultType();
+                // NOTE: only support BinaryBackendEntry currently
+                entry = new BinaryBackendEntry(type, col.name);
+            }
+            entry.columns(col);
+            return entry;
+        });
+    }
+
+    protected static BackendEntryIterator newEntryIteratorOlap(
+        BackendColumnIterator cols, Query query, boolean isOlap) {
+        return new BinaryEntryIterator<>(cols, query, (entry, col) -> {
+            if (entry == null || !entry.belongToMe(col)) {
+                HugeType type = query.resultType();
+                // NOTE: only support BinaryBackendEntry currently
+                entry = new BinaryBackendEntry(type, col.name, false, isOlap);
+            }
+            entry.columns(col);
+            return entry;
+        });
+    }
+
+    public static String bytes2String(byte[] bytes) {
+        StringBuilder result = new StringBuilder();
+        for (byte b : bytes) {
+            String st = String.format("%02x", b);
+            result.append(st);
+        }
+        return result.toString();
+    }
+
+    @Override
+    protected void registerMetaHandlers() {
+        this.registerMetaHandler("splits", (session, meta, args) -> {
+            E.checkArgument(args.length == 1,
+                            "The args count of %s must be 1", meta);
+            long splitSize = (long) args[0];
+            return this.shardSpliter.getSplits(session, splitSize);
+        });
+    }
+
+    @Override
+    public void init(Session session) {
+        // pass
+    }
+
+    @Override
+    public void clear(Session session) {
+        // pass
+    }
+
+    public boolean isOlap() {
+        return false;
+    }
+
+    private byte[] getOwner(BackendEntry entry) {
+        if (entry == null) {
+            return HgStoreClientConst.ALL_PARTITION_OWNER;
+        }
+        Id id = entry.type().isIndex() ? entry.id() : entry.originId();
+        return getOwnerId(id);
+    }
+
+    public Supplier<byte[]> getOwnerScanDelegate() {
+        return ownerScanDelegate;
+    }
+
+    public byte[] getInsertEdgeOwner(BackendEntry entry) {
+        Id id = entry.originId();
+        id = ((EdgeId) id).ownerVertexId();
+        return id.asBytes();
+    }
+
+    public byte[] getInsertOwner(BackendEntry entry) {
+        // 为适应label索引散列,不聚焦在一个分区
+        if (entry.type().isLabelIndex() && (entry.columns().size() == 1)) {
+            Iterator<BackendColumn> iterator = entry.columns().iterator();
+            while (iterator.hasNext()) {
+                BackendColumn next = iterator.next();
+                return next.name;
+            }
+        }
+
+        Id id = entry.type().isIndex() ? entry.id() : entry.originId();
+        return getOwnerId(id);
+    }
+
+    /**
+     * 返回Id所属的点ID
+     *
+     * @param id
+     * @return
+     */
+    protected byte[] getOwnerId(Id id) {
+        if (id instanceof BinaryBackendEntry.BinaryId) {
+            id = ((BinaryBackendEntry.BinaryId) id).origin();
+        }
+        if (id != null && id.edge()) {
+            id = ((EdgeId) id).ownerVertexId();
+        }
+        return id != null ? id.asBytes() :
+               HgStoreClientConst.ALL_PARTITION_OWNER;
+    }
+
+    /**
+     * 返回Id所属的点ID
+     *
+     * @param id
+     * @return
+     */
+    protected byte[] getOwnerId(HugeType type, Id id) {
+        if (type.equals(HugeType.VERTEX) || type.equals(HugeType.EDGE) ||
+            type.equals(HugeType.EDGE_OUT) || type.equals(HugeType.EDGE_IN) ||
+            type.equals(HugeType.COUNTER)) {
+            return getOwnerId(id);
+        } else {
+            return HgStoreClientConst.ALL_PARTITION_OWNER;
+        }
+    }
+
+    @Override
+    public void insert(Session session, BackendEntry entry) {
+        byte[] owner = entry.type().isEdge() ? getInsertEdgeOwner(entry) : 
getInsertOwner(entry);
+        ArrayList<BackendColumn> columns = new ArrayList<>(entry.columns());
+        for (int i = 0; i < columns.size(); i++) {
+            BackendColumn col = columns.get(i);
+            session.put(this.table(), owner, col.name, col.value);
+        }
+    }
+
+    public void insert(Session session, BackendEntry entry, boolean isEdge) {
+        byte[] owner = isEdge ? getInsertEdgeOwner(entry) : 
getInsertOwner(entry);
+        ArrayList<BackendColumn> columns = new ArrayList<>(entry.columns());
+        for (int i = 0; i < columns.size(); i++) {
+            BackendColumn col = columns.get(i);
+            session.put(this.table(), owner, col.name, col.value);
+        }
+    }
+
+    @Override
+    public void delete(Session session, BackendEntry entry) {
+        byte[] ownerKey = ownerDelegate.apply(entry);
+        if (entry.columns().isEmpty()) {
+            byte[] idBytes = entry.id().asBytes();
+            // LOG.debug("Delete from {} with owner {}, id: {}",
+            //          this.table(), bytes2String(ownerKey), idBytes);
+            session.delete(this.table(), ownerKey, idBytes);
+        } else {
+            for (BackendColumn col : entry.columns()) {
+                // LOG.debug("Delete from {} with owner {}, id: {}",
+                //          this.table(), bytes2String(ownerKey),
+                //          bytes2String(col.name));
+                assert entry.belongToMe(col) : entry;
+                session.delete(this.table(), ownerKey, col.name);
+            }
+        }
+    }
+
+    @Override
+    public void append(Session session, BackendEntry entry) {
+        assert entry.columns().size() == 1;
+        this.insert(session, entry);
+    }
+
+    @Override
+    public void eliminate(Session session, BackendEntry entry) {
+        assert entry.columns().size() == 1;
+        this.delete(session, entry);
+    }
+
+    @Override
+    public boolean queryExist(Session session, BackendEntry entry) {
+        Id id = entry.id();
+        try (BackendColumnIterator iter = this.queryById(session, id)) {
+            return iter.hasNext();
+        }
+    }
+
+    @Override
+    public Number queryNumber(Session session, Query query) {
+        Aggregate aggregate = query.aggregateNotNull();
+        if (aggregate.func() != AggregateFunc.COUNT) {
+            throw new NotSupportException(aggregate.toString());
+        }
+
+        assert aggregate.func() == AggregateFunc.COUNT;
+        assert query.noLimit();
+        Iterator<BackendColumn> results = this.queryBy(session, query);
+        if (results instanceof Countable) {
+            return ((Countable) results).count();
+        }
+        return IteratorUtils.count(results);
+    }
+
+    @Override
+    public Iterator<BackendEntry> query(Session session, Query query) {
+        if (query.limit() == 0L && !query.noLimit()) {
+            // LOG.debug("Return empty result(limit=0) for query {}", query);
+            return Collections.emptyIterator();
+        }
+        return newEntryIterator(this.queryBy(session, query), query);
+    }
+
+    @Override
+    public Iterator<BackendEntry> queryOlap(Session session, Query query) {
+        if (query.limit() == 0L && !query.noLimit()) {
+            // LOG.debug("Return empty result(limit=0) for query {}", query);
+            return Collections.emptyIterator();
+        }
+        return newEntryIteratorOlap(this.queryBy(session, query), query, true);
+    }
+
+    public List<Iterator<BackendEntry>> query(Session session,
+                                              List<IdPrefixQuery> queries,
+                                              String tableName) {
+        List<BackendColumnIterator> queryByPrefixList =
+            this.queryByPrefixList(session, queries, tableName);
+        LinkedList<Iterator<BackendEntry>> iterators = new LinkedList<>();
+        for (int i = 0; i < queryByPrefixList.size(); i++) {
+            IdPrefixQuery q = queries.get(i).copy();
+            q.capacity(Query.NO_CAPACITY);
+            q.limit(Query.NO_LIMIT);
+            BackendEntryIterator iterator =
+                newEntryIterator(queryByPrefixList.get(i), q);
+            iterators.add(iterator);
+        }
+        return iterators;
+    }
+
+    public BackendEntry.BackendIterator<Iterator<BackendEntry>> query(Session 
session,
+                                                                      
Iterator<IdPrefixQuery> queries,
+                                                                      String 
tableName) {
+        final IdPrefixQuery[] first = {queries.next()};
+        int type = first[0].withProperties() ? 0 : Session.SCAN_KEY_ONLY;
+
+        IdPrefixQuery queryTmpl = first[0].copy();
+        queryTmpl.capacity(Query.NO_CAPACITY);
+        queryTmpl.limit(Query.NO_LIMIT);
+
+        ConditionQuery originQuery = (ConditionQuery) first[0].originQuery();
+        if (originQuery != null) {
+            originQuery = prepareConditionQueryList(originQuery);
+        }
+        byte[] queryBytes = originQuery == null ? null : originQuery.bytes();
+
+        BackendEntry.BackendIterator<BackendColumnIterator> it
+            = session.scan(tableName, new Iterator<HgOwnerKey>() {
+            @Override
+            public boolean hasNext() {
+                if (first[0] != null) {
+                    return true;
+                }
+                return queries.hasNext();
+            }
+
+            @Override
+            public HgOwnerKey next() {
+                IdPrefixQuery query = first[0] != null ? first[0] : 
queries.next();
+                first[0] = null;
+                byte[] prefix = ownerByQueryDelegate.apply(query.resultType(),
+                                                           query.prefix());
+                return HgOwnerKey.of(prefix, query.prefix().asBytes());
+            }
+        }, type, first[0], queryBytes);
+        return new BackendEntry.BackendIterator<Iterator<BackendEntry>>() {
+            @Override
+            public boolean hasNext() {
+                return it.hasNext();
+            }
+
+            @Override
+            public Iterator<BackendEntry> next() {
+                BackendEntryIterator iterator = newEntryIterator(it.next(), 
queryTmpl);
+                return iterator;
+            }
+
+            @Override
+            public void close() {
+                it.close();
+            }
+
+            @Override
+            public byte[] position() {
+                return new byte[0];
+            }
+        };
+    }
+
+    protected BackendColumnIterator queryBy(Session session, Query query) {
+        // Query all
+        if (query.empty()) {
+            return this.queryAll(session, query);
+        }
+
+        // Query by prefix
+        if (query instanceof IdPrefixQuery) {
+            IdPrefixQuery pq = (IdPrefixQuery) query;
+            return this.queryByPrefix(session, pq);
+        }
+
+        // Query by range
+        if (query instanceof IdRangeQuery) {
+            IdRangeQuery rq = (IdRangeQuery) query;
+            return this.queryByRange(session, rq);
+        }
+
+        // Query by id
+        if (query.conditions().isEmpty()) {
+            assert !query.ids().isEmpty();
+            // 单个id查询 走get接口查询
+            if (query.ids().size() == 1) {
+                return this.getById(session, query.ids().iterator().next());
+            }
+            // NOTE: this will lead to lazy create rocksdb iterator
+            LinkedList<HgOwnerKey> hgOwnerKeys = new LinkedList<>();
+            for (Id id : query.ids()) {
+                hgOwnerKeys.add(HgOwnerKey.of(this.ownerByIdDelegate.apply(id),
+                                              id.asBytes()));
+            }
+            BackendColumnIterator withBatch = 
session.getWithBatch(this.table(),
+                                                                   
hgOwnerKeys);
+            return BackendColumnIterator.wrap(withBatch);
+        }
+
+        // Query by condition (or condition + id)
+        ConditionQuery cq = (ConditionQuery) query;
+        return this.queryByCond(session, cq);
+    }
+
+    protected BackendColumnIterator queryAll(Session session, Query query) {
+        if (query.paging()) {
+            PageState page = PageState.fromString(query.page());
+            byte[] ownerKey = this.getOwnerScanDelegate().get();
+            int scanType = Session.SCAN_ANY |
+                           (query.withProperties() ? 0 : 
Session.SCAN_KEY_ONLY);
+            byte[] queryBytes = query instanceof ConditionQuery ?
+                                ((ConditionQuery) query).bytes() : null;
+            // LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}, " +
+            //          "keyFrom: null, keyTo: null, scanType: {}, " +
+            //          "conditionQuery: {}, position: {}",
+            //          this.table(), bytes2String(ownerKey),
+            //          bytes2String(ownerKey), scanType,
+            //          queryBytes, page.position());
+            return session.scan(this.table(), ownerKey, ownerKey, null,
+                                null, scanType, queryBytes,
+                                page.position());
+        }
+        return session.scan(this.table(),
+                            query instanceof ConditionQuery ?
+                            ((ConditionQuery) query).bytes() : null);
+    }
+
+    protected BackendColumnIterator queryById(Session session, Id id) {
+        // TODO: change to get() after vertex and schema don't use id prefix
+        return session.scan(this.table(), this.ownerByIdDelegate.apply(id),
+                            id.asBytes());
+    }
+
+    protected BackendColumnIterator getById(Session session, Id id) {
+        byte[] value = session.get(this.table(),
+                                   this.ownerByIdDelegate.apply(id),
+                                   id.asBytes());
+        if (value.length == 0) {
+            return BackendColumnIterator.empty();
+        }
+        BackendColumn col = BackendColumn.of(id.asBytes(), value);
+        return BackendColumnIterator.iterator(col);
+    }
+
+    protected BackendColumnIterator queryByPrefix(Session session,
+                                                  IdPrefixQuery query) {
+        int type = query.inclusiveStart() ?
+                   Session.SCAN_GTE_BEGIN : Session.SCAN_GT_BEGIN;
+        type |= Session.SCAN_PREFIX_END;
+        byte[] position = null;
+        if (query.paging()) {
+            position = PageState.fromString(query.page()).position();
+        }
+        ConditionQuery originQuery = (ConditionQuery) query.originQuery();
+        if (originQuery != null) {
+            originQuery = prepareConditionQuery(originQuery);
+        }
+        byte[] ownerKeyFrom = 
this.ownerByQueryDelegate.apply(query.resultType(),
+                                                              query.start());
+        byte[] ownerKeyTo = this.ownerByQueryDelegate.apply(query.resultType(),
+                                                            query.prefix());
+        byte[] keyFrom = query.start().asBytes();
+        // 前缀分页查询中, start为最初的位置。因为在不同的分区 都是从start位置开始查询
+        if (query.paging()) {
+            keyFrom = query.prefix().asBytes();
+        }
+        byte[] keyTo = query.prefix().asBytes();
+        byte[] queryBytes = originQuery == null ?
+                            null :
+                            originQuery.bytes();
+
+        // LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}," +
+        //          "keyFrom: {}, keyTo: {}, scanType: {}, conditionQuery: 
{}," +
+        //          "position: {}",
+        //          this.table(), bytes2String(ownerKeyFrom),
+        //          bytes2String(ownerKeyTo), bytes2String(keyFrom),
+        //          bytes2String(keyTo), type, originQuery, position);
+
+        return session.scan(this.table(), ownerKeyFrom, ownerKeyTo, keyFrom,
+                            keyTo, type, queryBytes, position);
+    }
+
+    protected List<BackendColumnIterator> queryByPrefixList(
+        Session session,
+        List<IdPrefixQuery> queries,
+        String tableName) {
+        E.checkArgument(queries.size() > 0,
+                        "The size of queries must be greater than zero");
+        IdPrefixQuery query = queries.get(0);
+        int type = 0;
+        LinkedList<HgOwnerKey> ownerKey = new LinkedList<>();
+        queries.forEach((item) -> {
+            byte[] prefix = this.ownerByQueryDelegate.apply(item.resultType(),
+                                                            item.prefix());
+            ownerKey.add(HgOwnerKey.of(prefix, item.prefix().asBytes()));
+        });
+        ConditionQuery originQuery = (ConditionQuery) query.originQuery();
+        if (originQuery != null) {
+            originQuery = prepareConditionQueryList(originQuery);
+        }
+        byte[] queryBytes = originQuery == null ? null : originQuery.bytes();
+
+        // LOG.debug("query {} with scanType: {}, limit: {}, conditionQuery:
+        // {}", this.table(), type, query.limit(), queryBytes);
+        return session.scan(tableName, ownerKey, type,
+                            query.limit(), queryBytes);
+    }
+
+    /***
+     * Prepare ConditionQuery to do operator sinking, because some scenes do 
not need to be
+     * preserved
+     * @param conditionQuery
+     * @return
+     */
+    private ConditionQuery prepareConditionQuery(ConditionQuery 
conditionQuery) {
+        if (CollectionUtils.isEmpty(conditionQuery.userpropConditions())) {
+            return null;
+        }
+        // only userpropConditions can send to store
+        Collection<Condition> conditions = conditionQuery.conditions();
+        List<Condition> newConditions = new ArrayList<>();
+        for (Condition condition : conditions) {
+            if (!onlyOwnerVertex(condition)) {
+                newConditions.add(condition);
+            }
+        }
+        if (newConditions.size() > 0) {
+            conditionQuery.resetConditions(newConditions);
+            return conditionQuery;
+        } else {
+            return null;
+        }
+    }
+
+    /***
+     * Prepare ConditionQuery to do operator sinking, because some scenes do 
not need to be
+     * preserved
+     * @param conditionQuery
+     * @return
+     */
+    private ConditionQuery prepareConditionQueryList(ConditionQuery 
conditionQuery) {
+        if (!conditionQuery.containsLabelOrUserpropRelation()) {
+            return null;
+        }
+        // only userpropConditions can send to store
+        Collection<Condition> conditions = conditionQuery.conditions();
+        List<Condition> newConditions = new ArrayList<>();
+        for (Condition condition : conditions) {
+            if (!onlyOwnerVertex(condition)) {
+                newConditions.add(condition);
+            }
+        }
+        if (newConditions.size() > 0) {
+            conditionQuery.resetConditions(newConditions);
+            return conditionQuery;
+        } else {
+            return null;
+        }
+    }
+
+    private boolean onlyOwnerVertex(Condition condition) {
+        boolean onlyOwnerVertex = true;
+        List<? extends Relation> relations = condition.relations();
+        for (Relation r : relations) {
+            if (!r.key().equals(HugeKeys.OWNER_VERTEX)) {
+                onlyOwnerVertex = false;
+                break;
+            }
+        }
+        return onlyOwnerVertex;
+    }
+
+    protected BackendColumnIterator queryByRange(Session session,
+                                                 IdRangeQuery query) {
+        byte[] start = query.start().asBytes();
+        byte[] end = query.end() == null ? null : query.end().asBytes();
+        int type = query.inclusiveStart() ?
+                   Session.SCAN_GTE_BEGIN : Session.SCAN_GT_BEGIN;
+        if (end != null) {
+            type |= query.inclusiveEnd() ?
+                    Session.SCAN_LTE_END : Session.SCAN_LT_END;
+        }
+        ConditionQuery cq;
+        Query origin = query.originQuery();
+        byte[] position = null;
+        if (query.paging() && !query.page().isEmpty()) {
+            position = PageState.fromString(query.page()).position();
+        }
+        byte[] ownerStart = this.ownerByQueryDelegate.apply(query.resultType(),
+                                                            query.start());
+        byte[] ownerEnd = this.ownerByQueryDelegate.apply(query.resultType(),
+                                                          query.end());
+        if (origin instanceof ConditionQuery &&
+            (query.resultType().isEdge() || query.resultType().isVertex())) {
+            cq = (ConditionQuery) query.originQuery();
+
+            // LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}, " +
+            //          "keyFrom: {}, keyTo: {}, " +
+            //          "scanType: {}, conditionQuery: {}",
+            //          this.table(), bytes2String(ownerStart),
+            //          bytes2String(ownerEnd), bytes2String(start),
+            //          bytes2String(end), type, cq.bytes());
+            return session.scan(this.table(), ownerStart,
+                                ownerEnd, start, end, type, cq.bytes(), 
position);
+        }
+        return session.scan(this.table(), ownerStart,
+                            ownerEnd, start, end, type, null, position);
+    }
+
+    protected BackendColumnIterator queryByCond(Session session,
+                                                ConditionQuery query) {
+        if (query.containsScanCondition()) {
+            E.checkArgument(query.relations().size() == 1,
+                            "Invalid scan with multi conditions: %s", query);
+            Relation scan = query.relations().iterator().next();
+            Shard shard = (Shard) scan.value();
+            return this.queryByRange(session, shard, query);
+        }
+        // throw new NotSupportException("query: %s", query);
+        return this.queryAll(session, query);
+    }
+
+    protected BackendColumnIterator queryByRange(Session session, Shard shard,
+                                                 ConditionQuery query) {
+        int type = Session.SCAN_GTE_BEGIN;
+        type |= Session.SCAN_LT_END;
+        type |= Session.SCAN_HASHCODE;
+        type |= query.withProperties() ? 0 : Session.SCAN_KEY_ONLY;
+
+        int start = Integer.parseInt(StringUtils.isEmpty(shard.start()) ?
+                                     "0" : shard.start());
+        int end = Integer.parseInt(StringUtils.isEmpty(shard.end()) ?
+                                   "0" : shard.end());
+        byte[] queryBytes = query.bytes();
+        String page = query.page();
+        if (page != null && !page.isEmpty()) {
+            byte[] position = PageState.fromString(page).position();
+            return session.scan(this.table(), start, end, type, queryBytes,
+                                position);
+        }
+        return session.scan(this.table(), start, end, type, queryBytes);
+    }
+
+    private static class HstoreShardSplitter extends ShardSplitter<Session> {
+
+        public HstoreShardSplitter(String table) {
+            super(table);
+        }
+
+        @Override
+        public List<Shard> getSplits(Session session, long splitSize) {
+            E.checkArgument(splitSize >= MIN_SHARD_SIZE,
+                            "The split-size must be >= %s bytes, but got %s",
+                            MIN_SHARD_SIZE, splitSize);
+
+            List<Shard> splits = new ArrayList<>();
+            try {
+                PDClient pdClient = HstoreSessionsImpl.getDefaultPdClient();
+                List<Metapb.Partition> partitions = pdClient.getPartitions(0,
+                                                                           
session.getGraphName());
+                for (Metapb.Partition partition : partitions) {
+                    String start = String.valueOf(partition.getStartKey());
+                    String end = String.valueOf(partition.getEndKey());
+                    splits.add(new Shard(start, end, 0));
+                }
+            } catch (PDException e) {
+                e.printStackTrace();
+            }
+
+            return splits.size() != 0 ?
+                   splits : super.getSplits(session, splitSize);
+        }
+
+        @Override
+        public long estimateDataSize(Session session) {
+            return 1L;
+        }
+
+        @Override
+        public long estimateNumKeys(Session session) {
+            return 1L;
+        }
+
+        @Override
+        public byte[] position(String position) {
+            if (END.equals(position)) {
+                return null;
+            }
+            return StringEncoding.decodeBase64(position);
+        }
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTables.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTables.java
new file mode 100644
index 000000000..f4ebf7ebe
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTables.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore;
+
+import java.util.List;
+
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.query.Condition;
+import org.apache.hugegraph.backend.query.Condition.Relation;
+import org.apache.hugegraph.backend.query.ConditionQuery;
+import org.apache.hugegraph.backend.serializer.BinarySerializer;
+import org.apache.hugegraph.backend.store.BackendEntry;
+import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
+import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Session;
+import org.apache.hugegraph.type.HugeTableType;
+import org.apache.hugegraph.type.HugeType;
+import org.apache.hugegraph.type.define.HugeKeys;
+import org.apache.hugegraph.util.E;
+
+public class HstoreTables {
+
+    public static class Vertex extends HstoreTable {
+
+        public static final String TABLE = HugeTableType.VERTEX.string();
+
+        public Vertex(String database) {
+            super(database, TABLE);
+        }
+
+        @Override
+        protected BackendColumnIterator queryById(Session session, Id id) {
+            return this.getById(session, id);
+        }
+    }
+
+    /**
+     * task信息存储表
+     */
+    public static class TaskInfo extends HstoreTable {
+        public static final String TABLE = 
HugeTableType.TASK_INFO_TABLE.string();
+
+        public TaskInfo(String database) {
+            super(database, TABLE);
+        }
+
+        @Override
+        protected BackendColumnIterator queryById(Session session, Id id) {
+            return this.getById(session, id);
+        }
+    }
+
+    public static class ServerInfo extends HstoreTable {
+        public static final String TABLE = 
HugeTableType.SERVER_INFO_TABLE.string();
+
+        public ServerInfo(String database) {
+            super(database, TABLE);
+        }
+
+        @Override
+        protected BackendColumnIterator queryById(Session session, Id id) {
+            return this.getById(session, id);
+        }
+    }
+
+    public static class Edge extends HstoreTable {
+
+        public static final String TABLE_SUFFIX = HugeType.EDGE.string();
+
+        public Edge(boolean out, String database) {
+            // Edge out/in table
+            super(database, (out ? HugeTableType.OUT_EDGE.string() :
+                             HugeTableType.IN_EDGE.string()));
+        }
+
+        public static Edge out(String database) {
+            return new Edge(true, database);
+        }
+
+        public static Edge in(String database) {
+            return new Edge(false, database);
+        }
+
+        @Override
+        protected BackendColumnIterator queryById(Session session, Id id) {
+            return this.getById(session, id);
+        }
+    }
+
+    public static class IndexTable extends HstoreTable {
+
+        public static final String TABLE = 
HugeTableType.ALL_INDEX_TABLE.string();
+
+        public IndexTable(String database) {
+            super(database, TABLE);
+        }
+
+        @Override
+        public void eliminate(Session session, BackendEntry entry) {
+            assert entry.columns().size() == 1;
+            super.delete(session, entry);
+        }
+
+        @Override
+        public void delete(Session session, BackendEntry entry) {
+            /*
+             * Only delete index by label will come here
+             * Regular index delete will call eliminate()
+             */
+            byte[] ownerKey = super.ownerDelegate.apply(entry);
+            for (BackendEntry.BackendColumn column : entry.columns()) {
+                // Don't assert entry.belongToMe(column), length-prefix is 1*
+                session.deletePrefix(this.table(), ownerKey, column.name);
+            }
+        }
+
+        /**
+         * 主要用于 range类型的index处理
+         *
+         * @param session
+         * @param query
+         * @return
+         */
+        @Override
+        protected BackendColumnIterator queryByCond(Session session,
+                                                    ConditionQuery query) {
+            assert !query.conditions().isEmpty();
+
+            List<Condition> conds = query.syspropConditions(HugeKeys.ID);
+            E.checkArgument(!conds.isEmpty(),
+                            "Please specify the index conditions");
+
+            Id prefix = null;
+            Id min = null;
+            boolean minEq = false;
+            Id max = null;
+            boolean maxEq = false;
+
+            for (Condition c : conds) {
+                Relation r = (Relation) c;
+                switch (r.relation()) {
+                    case PREFIX:
+                        prefix = (Id) r.value();
+                        break;
+                    case GTE:
+                        minEq = true;
+                    case GT:
+                        min = (Id) r.value();
+                        break;
+                    case LTE:
+                        maxEq = true;
+                    case LT:
+                        max = (Id) r.value();
+                        break;
+                    default:
+                        E.checkArgument(false, "Unsupported relation '%s'",
+                                        r.relation());
+                }
+            }
+
+            E.checkArgumentNotNull(min, "Range index begin key is missing");
+            byte[] begin = min.asBytes();
+            if (!minEq) {
+                BinarySerializer.increaseOne(begin);
+            }
+            byte[] ownerStart = this.ownerScanDelegate.get();
+            byte[] ownerEnd = this.ownerScanDelegate.get();
+            if (max == null) {
+                E.checkArgumentNotNull(prefix, "Range index prefix is 
missing");
+                return session.scan(this.table(), ownerStart, ownerEnd, begin,
+                                    prefix.asBytes(), Session.SCAN_PREFIX_END);
+            } else {
+                byte[] end = max.asBytes();
+                int type = maxEq ? Session.SCAN_LTE_END : Session.SCAN_LT_END;
+                return session.scan(this.table(), ownerStart,
+                                    ownerEnd, begin, end, type);
+            }
+        }
+    }
+
+    public static class OlapTable extends HstoreTable {
+
+        public static final String TABLE = HugeTableType.OLAP_TABLE.string();
+
+        public OlapTable(String database) {
+            // 由原先多个ap_{pk_id} 合并成一个ap表
+            super(database, TABLE);
+        }
+
+        @Override
+        protected BackendColumnIterator queryById(Session session, Id id) {
+            return this.getById(session, id);
+        }
+
+        @Override
+        public boolean isOlap() {
+            return true;
+        }
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/IdClient.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/IdClient.java
new file mode 100644
index 000000000..3e42bce2d
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/IdClient.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore.fake;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.hugegraph.backend.store.hstore.HstoreSessions;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+
+public abstract class IdClient {
+
+    protected HstoreSessions.Session session;
+    protected String table;
+
+    public IdClient(HstoreSessions.Session session, String table) {
+        this.session = session;
+        this.table = table;
+    }
+
+    protected static byte[] b(long value) {
+        return ByteBuffer.allocate(Long.BYTES).order(
+                ByteOrder.nativeOrder()).putLong(value).array();
+    }
+
+    protected static long l(byte[] bytes) {
+        assert bytes.length == Long.BYTES;
+        return ByteBuffer.wrap(bytes).order(
+                ByteOrder.nativeOrder()).getLong();
+    }
+
+    public abstract Pdpb.GetIdResponse getIdByKey(String key, int delta)
+            throws Exception;
+
+    public abstract Pdpb.ResetIdResponse resetIdByKey(String key) throws 
Exception;
+
+    public abstract void increaseId(String key, long increment)
+            throws Exception;
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/PDIdClient.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/PDIdClient.java
new file mode 100644
index 000000000..0dbfc56ee
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/PDIdClient.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.hstore.fake;
+
+import org.apache.hugegraph.backend.store.hstore.HstoreSessions;
+import org.apache.hugegraph.backend.store.hstore.HstoreSessionsImpl;
+import org.apache.hugegraph.pd.client.PDClient;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+
+public class PDIdClient extends IdClient {
+
+    PDClient pdClient;
+
+    public PDIdClient(HstoreSessions.Session session, String table) {
+        super(session, table);
+        pdClient = HstoreSessionsImpl.getDefaultPdClient();
+    }
+
+    @Override
+    public Pdpb.GetIdResponse getIdByKey(String key, int delta) throws 
Exception {
+        return pdClient.getIdByKey(key, delta);
+    }
+
+    @Override
+    public Pdpb.ResetIdResponse resetIdByKey(String key) throws Exception {
+        return pdClient.resetIdByKey(key);
+    }
+
+    @Override
+    public void increaseId(String key, long increment) throws Exception {
+        pdClient.getIdByKey(key, (int) increment);
+    }
+}

Reply via email to