This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch intro-hstore in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 83086b1999c4bbf252ddcfec7a9ca4efa0aa1044 Author: VGalaxies <[email protected]> AuthorDate: Fri Apr 26 13:48:31 2024 +0800 git add hugegraph-server/hugegraph-hstore/ --- hugegraph-server/hugegraph-hstore/pom.xml | 50 ++ .../backend/store/hstore/HstoreFeatures.java | 133 ++++ .../backend/store/hstore/HstoreMetrics.java | 44 ++ .../backend/store/hstore/HstoreOptions.java | 52 ++ .../backend/store/hstore/HstoreProvider.java | 54 ++ .../backend/store/hstore/HstoreSessions.java | 208 ++++++ .../backend/store/hstore/HstoreSessionsImpl.java | 802 ++++++++++++++++++++ .../backend/store/hstore/HstoreStore.java | 825 +++++++++++++++++++++ .../backend/store/hstore/HstoreTable.java | 732 ++++++++++++++++++ .../backend/store/hstore/HstoreTables.java | 214 ++++++ .../backend/store/hstore/fake/IdClient.java | 54 ++ .../backend/store/hstore/fake/PDIdClient.java | 48 ++ 12 files changed, 3216 insertions(+) diff --git a/hugegraph-server/hugegraph-hstore/pom.xml b/hugegraph-server/hugegraph-hstore/pom.xml new file mode 100644 index 000000000..f777eb05e --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF + licenses this file to You under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + --> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hugegraph-server</artifactId> + <groupId>org.apache.hugegraph</groupId> + <version>${revision}</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>hugegraph-hstore</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.hugegraph</groupId> + <artifactId>hugegraph-core</artifactId> + <version>${revision}</version> + </dependency> + <dependency> + <groupId>org.apache.hugegraph</groupId> + <artifactId>hg-store-client</artifactId> + <version>${revision}</version> + </dependency> + <dependency> + <groupId>org.apache.hugegraph</groupId> + <artifactId>hg-pd-client</artifactId> + <version>${revision}</version> + </dependency> + + </dependencies> +</project> diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreFeatures.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreFeatures.java new file mode 100644 index 000000000..3af6f803b --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreFeatures.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import org.apache.hugegraph.backend.store.BackendFeatures; + +public class HstoreFeatures implements BackendFeatures { + + @Override + public boolean supportsScanToken() { + return false; + } + + @Override + public boolean supportsScanKeyPrefix() { + return true; + } + + @Override + public boolean supportsScanKeyRange() { + return true; + } + + @Override + public boolean supportsQuerySchemaByName() { + return false; + } + + @Override + public boolean supportsQueryByLabel() { + return false; + } + + @Override + public boolean supportsQueryWithInCondition() { + return false; + } + + @Override + public boolean supportsQueryWithRangeCondition() { + return true; + } + + @Override + public boolean supportsQuerySortByInputIds() { + return true; + } + + @Override + public boolean supportsQueryWithOrderBy() { + return true; + } + + @Override + public boolean supportsQueryWithContains() { + return false; + } + + @Override + public boolean supportsQueryWithContainsKey() { + return false; + } + + @Override + public boolean supportsQueryByPage() { + return true; + } + + @Override + public boolean supportsDeleteEdgeByLabel() { + return false; + } + + @Override + public boolean supportsUpdateVertexProperty() { + // Vertex properties are stored in a cell(column value) + return false; + } + + @Override + public boolean supportsMergeVertexProperty() { + return false; + } + + @Override + public boolean supportsUpdateEdgeProperty() { + // Edge properties are stored in a cell(column value) + return false; + } + + @Override + public boolean supportsTransaction() { + return false; + } + + @Override + public boolean supportsNumberType() { + return false; + } + + @Override + public boolean supportsAggregateProperty() { + return false; + } + + @Override + public boolean supportsTtl() { + return false; + } + + @Override + public boolean supportsOlapProperties() { + return true; + } + + @Override + public boolean supportsTaskAndServerVertex() { return true; } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreMetrics.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreMetrics.java new file mode 100644 index 000000000..c5f180887 --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreMetrics.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import java.util.List; +import java.util.Map; + +import org.apache.hugegraph.backend.store.BackendMetrics; +import org.apache.hugegraph.util.InsertionOrderUtil; + +public class HstoreMetrics implements BackendMetrics { + + private final List<HstoreSessions> dbs; + private final HstoreSessions.Session session; + + public HstoreMetrics(List<HstoreSessions> dbs, + HstoreSessions.Session session) { + this.dbs = dbs; + this.session = session; + } + + @Override + public Map<String, Object> metrics() { + Map<String, Object> results = InsertionOrderUtil.newMap(); + // TODO(metrics): fetch more metrics from PD + results.put(NODES, session.getActiveStoreSize()); + return results; + } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java new file mode 100644 index 000000000..6de800697 --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import static org.apache.hugegraph.config.OptionChecker.disallowEmpty; + +import org.apache.hugegraph.config.ConfigOption; +import org.apache.hugegraph.config.OptionHolder; + +public class HstoreOptions extends OptionHolder { + + public static final ConfigOption<Integer> PARTITION_COUNT = new ConfigOption<>( + "hstore.partition_count", + "Number of partitions, which PD controls partitions based on.", + disallowEmpty(), + 0 + ); + public static final ConfigOption<Integer> SHARD_COUNT = new ConfigOption<>( + "hstore.shard_count", + "Number of copies, which PD controls partition copies based on.", + disallowEmpty(), + 0 + ); + private static volatile HstoreOptions instance; + + private HstoreOptions() { + super(); + } + + public static synchronized HstoreOptions instance() { + if (instance == null) { + instance = new HstoreOptions(); + instance.registerOptions(); + } + return instance; + } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreProvider.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreProvider.java new file mode 100644 index 000000000..f9d48d36c --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import org.apache.hugegraph.backend.store.AbstractBackendStoreProvider; +import org.apache.hugegraph.backend.store.BackendStore; +import org.apache.hugegraph.config.HugeConfig; + +public class HstoreProvider extends AbstractBackendStoreProvider { + + protected String namespace() { + return this.graph(); + } + + @Override + public String type() { + return "hstore"; + } + + @Override + public String driverVersion() { + return "1.13"; + } + + @Override + protected BackendStore newSchemaStore(HugeConfig config, String store) { + return new HstoreStore.HstoreSchemaStore(this, this.namespace(), store); + } + + @Override + protected BackendStore newGraphStore(HugeConfig config, String store) { + return new HstoreStore.HstoreGraphStore(this, this.namespace(), store); + } + + @Override + protected BackendStore newSystemStore(HugeConfig config, String store) { + return null; + } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessions.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessions.java new file mode 100755 index 000000000..0abb6458b --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessions.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hugegraph.backend.query.Query; +import org.apache.hugegraph.backend.store.BackendEntry; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; +import org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession; +import org.apache.hugegraph.backend.store.BackendSessionPool; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.store.HgOwnerKey; +import org.apache.hugegraph.type.define.GraphMode; + +public abstract class HstoreSessions extends BackendSessionPool { + + public HstoreSessions(HugeConfig config, String database, String store) { + super(config, database + "/" + store); + } + + public abstract Set<String> openedTables(); + + public abstract void createTable(String... tables); + + public abstract void dropTable(String... tables); + + public abstract boolean existsTable(String table); + + public abstract void truncateTable(String table); + + public abstract void clear(); + + @Override + public abstract Session session(); + + public interface Countable { + + public long count(); + } + + /** + * Session for Hstore + */ + public static abstract class Session extends AbstractBackendSession { + + public static final int SCAN_ANY = 0x80; + public static final int SCAN_PREFIX_BEGIN = 0x01; + public static final int SCAN_PREFIX_END = 0x02; + public static final int SCAN_GT_BEGIN = 0x04; + public static final int SCAN_GTE_BEGIN = 0x0c; + public static final int SCAN_LT_END = 0x10; + public static final int SCAN_LTE_END = 0x30; + public static final int SCAN_KEY_ONLY = 0x40; + public static final int SCAN_HASHCODE = 0x100; + + private HugeConfig conf; + private String graphName; + + public static boolean matchScanType(int expected, int actual) { + return (expected & actual) == expected; + } + + public abstract void createTable(String tableName); + + public abstract void dropTable(String tableName); + + public abstract boolean existsTable(String tableName); + + public abstract void truncateTable(String tableName); + + public abstract void deleteGraph(); + + public abstract Pair<byte[], byte[]> keyRange(String table); + + public abstract void put(String table, byte[] ownerKey, + byte[] key, byte[] value); + + public abstract void increase(String table, byte[] ownerKey, + byte[] key, byte[] value); + + public abstract void delete(String table, byte[] ownerKey, byte[] key); + + public abstract void deletePrefix(String table, byte[] ownerKey, + byte[] key); + + public abstract void deleteRange(String table, byte[] ownerKeyFrom, + byte[] ownerKeyTo, byte[] keyFrom, + byte[] keyTo); + + public abstract byte[] get(String table, byte[] key); + + public abstract byte[] get(String table, byte[] ownerKey, byte[] key); + + public abstract BackendColumnIterator scan(String table); + + public abstract BackendColumnIterator scan(String table, + byte[] ownerKey, + byte[] prefix); + + public BackendColumnIterator scan(String table, byte[] ownerKeyFrom, + byte[] ownerKeyTo, byte[] keyFrom, + byte[] keyTo) { + return this.scan(table, ownerKeyFrom, ownerKeyTo, keyFrom, keyTo, + SCAN_LT_END); + } + + public abstract List<BackendColumnIterator> scan(String table, + List<HgOwnerKey> keys, + int scanType, + long limit, + byte[] query); + + public abstract BackendEntry.BackendIterator<BackendColumnIterator> scan(String table, + Iterator<HgOwnerKey> keys, + int scanType, + Query queryParam, + byte[] query); + + public abstract BackendColumnIterator scan(String table, + byte[] ownerKeyFrom, + byte[] ownerKeyTo, + byte[] keyFrom, + byte[] keyTo, + int scanType); + + public abstract BackendColumnIterator scan(String table, + byte[] ownerKeyFrom, + byte[] ownerKeyTo, + byte[] keyFrom, + byte[] keyTo, + int scanType, + byte[] query); + + public abstract BackendColumnIterator scan(String table, + byte[] ownerKeyFrom, + byte[] ownerKeyTo, + byte[] keyFrom, + byte[] keyTo, + int scanType, + byte[] query, + byte[] position); + + public abstract BackendColumnIterator scan(String table, + int codeFrom, + int codeTo, + int scanType, + byte[] query); + + public abstract BackendColumnIterator scan(String table, + int codeFrom, + int codeTo, + int scanType, + byte[] query, + byte[] position); + + public abstract BackendColumnIterator getWithBatch(String table, + List<HgOwnerKey> keys); + + public abstract void merge(String table, byte[] ownerKey, + byte[] key, byte[] value); + + public abstract void setMode(GraphMode mode); + + public abstract void truncate() throws Exception; + + public abstract BackendColumnIterator scan(String table, + byte[] conditionQueryToByte); + + public HugeConfig getConf() { + return conf; + } + + public void setConf(HugeConfig conf) { + this.conf = conf; + } + + public String getGraphName() { + return graphName; + } + + public void setGraphName(String graphName) { + this.graphName = graphName; + } + + public abstract void beginTx(); + + public abstract int getActiveStoreSize(); + } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java new file mode 100755 index 000000000..e2ddfd97c --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java @@ -0,0 +1,802 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hugegraph.backend.query.Query; +import org.apache.hugegraph.backend.store.BackendEntry; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; +import org.apache.hugegraph.backend.store.BackendEntryIterator; +import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.pd.client.PDClient; +import org.apache.hugegraph.pd.client.PDConfig; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.pd.grpc.Metapb; +import org.apache.hugegraph.store.HgKvEntry; +import org.apache.hugegraph.store.HgKvIterator; +import org.apache.hugegraph.store.HgOwnerKey; +import org.apache.hugegraph.store.HgScanQuery; +import org.apache.hugegraph.store.HgStoreClient; +import org.apache.hugegraph.store.HgStoreSession; +import org.apache.hugegraph.store.client.grpc.KvCloseableIterator; +import org.apache.hugegraph.store.client.util.HgStoreClientConst; +import org.apache.hugegraph.store.grpc.common.ScanOrderType; +import org.apache.hugegraph.testutil.Assert; +import org.apache.hugegraph.type.define.GraphMode; +import org.apache.hugegraph.util.Bytes; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.StringEncoding; + +public class HstoreSessionsImpl extends HstoreSessions { + + private static final Set<String> infoInitializedGraph = + Collections.synchronizedSet(new HashSet<>()); + private static int tableCode = 0; + private static volatile Boolean initializedNode = Boolean.FALSE; + private static volatile PDClient defaultPdClient; + private static volatile HgStoreClient hgStoreClient; + private final HugeConfig config; + private final HstoreSession session; + private final Map<String, Integer> tables; + private final AtomicInteger refCount; + private final String graphName; + + public HstoreSessionsImpl(HugeConfig config, String database, + String store) { + super(config, database, store); + this.config = config; + this.graphName = database + "/" + store; + this.initStoreNode(config); + this.session = new HstoreSession(this.config, graphName); + this.tables = new ConcurrentHashMap<>(); + this.refCount = new AtomicInteger(1); + } + + public static HgStoreClient getHgStoreClient() { + return hgStoreClient; + } + + public static PDClient getDefaultPdClient() { + return defaultPdClient; + } + + public static byte[] encode(String string) { + return StringEncoding.encode(string); + } + + public static String decode(byte[] bytes) { + return StringEncoding.decode(bytes); + } + + private void initStoreNode(HugeConfig config) { + if (!initializedNode) { + synchronized (this) { + if (!initializedNode) { + PDConfig pdConfig = + PDConfig.of(config.get(CoreOptions.PD_PEERS)) + .setEnableCache(true); + defaultPdClient = PDClient.create(pdConfig); + hgStoreClient = + HgStoreClient.create(defaultPdClient); + initializedNode = Boolean.TRUE; + } + } + } + } + + @Override + public void open() throws Exception { + if (!infoInitializedGraph.contains(this.graphName)) { + synchronized (infoInitializedGraph) { + if (!infoInitializedGraph.contains(this.graphName)) { + Integer partitionCount = + this.config.get(HstoreOptions.PARTITION_COUNT); + Assert.assertTrue("The value of hstore.partition_count" + + " cannot be less than 0.", + partitionCount > -1); + defaultPdClient.setGraph(Metapb.Graph.newBuilder() + .setGraphName( + this.graphName) + .setPartitionCount( + partitionCount) + .build()); + infoInitializedGraph.add(this.graphName); + } + } + } + this.session.open(); + } + + @Override + protected boolean opened() { + return this.session != null; + } + + @Override + public Set<String> openedTables() { + return this.tables.keySet(); + } + + @Override + public synchronized void createTable(String... tables) { + for (String table : tables) { + this.session.createTable(table); + this.tables.put(table, tableCode++); + } + } + + @Override + public synchronized void dropTable(String... tables) { + for (String table : tables) { + this.session.dropTable(table); + this.tables.remove(table); + } + } + + @Override + public boolean existsTable(String table) { + return this.session.existsTable(table); + } + + @Override + public void truncateTable(String table) { + this.session.truncateTable(table); + } + + @Override + public void clear() { + this.session.deleteGraph(); + try { + hgStoreClient.getPdClient().delGraph(this.graphName); + } catch (PDException e) { + + } + } + + @Override + public final Session session() { + return (Session) super.getOrNewSession(); + } + + @Override + protected final Session newSession() { + return new HstoreSession(this.config(), this.graphName); + } + + @Override + protected synchronized void doClose() { + this.checkValid(); + if (this.refCount != null) { + if (this.refCount.decrementAndGet() > 0) { + return; + } + if (this.refCount.get() != 0) { + return; + } + } + assert this.refCount.get() == 0; + this.tables.clear(); + this.session.close(); + } + + private void checkValid() { + } + + private static class ColumnIterator<T extends HgKvIterator> implements + BackendColumnIterator, + Countable { + + private final T iter; + private final byte[] keyBegin; + private final byte[] keyEnd; + private final int scanType; + private final String table; + private final byte[] value; + private boolean gotNext; + private byte[] position; + + public ColumnIterator(String table, T results) { + this(table, results, null, null, 0); + } + + public ColumnIterator(String table, T results, byte[] keyBegin, + byte[] keyEnd, int scanType) { + E.checkNotNull(results, "results"); + this.table = table; + this.iter = results; + this.keyBegin = keyBegin; + this.keyEnd = keyEnd; + this.scanType = scanType; + this.value = null; + if (this.iter.hasNext()) { + this.iter.next(); + this.gotNext = true; + this.position = iter.position(); + } else { + this.gotNext = false; + // QUESTION: Resetting the position may result in the caller being unable to + // retrieve the corresponding position. + this.position = null; + } + if (!ArrayUtils.isEmpty(this.keyBegin) || + !ArrayUtils.isEmpty(this.keyEnd)) { + this.checkArguments(); + } + + } + + public T iter() { + return iter; + } + + private void checkArguments() { + E.checkArgument(!(this.match(Session.SCAN_PREFIX_BEGIN) && + this.match(Session.SCAN_PREFIX_END)), + "Can't set SCAN_PREFIX_WITH_BEGIN and " + + "SCAN_PREFIX_WITH_END at the same time"); + + E.checkArgument(!(this.match(Session.SCAN_PREFIX_BEGIN) && + this.match(Session.SCAN_GT_BEGIN)), + "Can't set SCAN_PREFIX_WITH_BEGIN and " + + "SCAN_GT_BEGIN/SCAN_GTE_BEGIN at the same time"); + + E.checkArgument(!(this.match(Session.SCAN_PREFIX_END) && + this.match(Session.SCAN_LT_END)), + "Can't set SCAN_PREFIX_WITH_END and " + + "SCAN_LT_END/SCAN_LTE_END at the same time"); + + if (this.match(Session.SCAN_PREFIX_BEGIN) && !matchHash()) { + E.checkArgument(this.keyBegin != null, + "Parameter `keyBegin` can't be null " + + "if set SCAN_PREFIX_WITH_BEGIN"); + E.checkArgument(this.keyEnd == null, + "Parameter `keyEnd` must be null " + + "if set SCAN_PREFIX_WITH_BEGIN"); + } + + if (this.match(Session.SCAN_PREFIX_END) && !matchHash()) { + E.checkArgument(this.keyEnd != null, + "Parameter `keyEnd` can't be null " + + "if set SCAN_PREFIX_WITH_END"); + } + + if (this.match(Session.SCAN_GT_BEGIN) && !matchHash()) { + E.checkArgument(this.keyBegin != null, + "Parameter `keyBegin` can't be null " + + "if set SCAN_GT_BEGIN or SCAN_GTE_BEGIN"); + } + + if (this.match(Session.SCAN_LT_END) && !matchHash()) { + E.checkArgument(this.keyEnd != null, + "Parameter `keyEnd` can't be null " + + "if set SCAN_LT_END or SCAN_LTE_END"); + } + } + + private boolean matchHash() { + return this.scanType == Session.SCAN_HASHCODE; + } + + private boolean match(int expected) { + return Session.matchScanType(expected, this.scanType); + } + + + @Override + public boolean hasNext() { + if (gotNext) { + this.position = this.iter.position(); + } else { + // QUESTION: Resetting the position may result in the caller being unable to + // retrieve the corresponding position. + this.position = null; + } + return gotNext; + } + + private boolean filter(byte[] key) { + if (this.match(Session.SCAN_PREFIX_BEGIN)) { + /* + * Prefix with `keyBegin`? + * TODO: use custom prefix_extractor instead + * or use ReadOptions.prefix_same_as_start + */ + return Bytes.prefixWith(key, this.keyBegin); + } else if (this.match(Session.SCAN_PREFIX_END)) { + /* + * Prefix with `keyEnd`? + * like the following query for range index: + * key > 'age:20' and prefix with 'age' + */ + assert this.keyEnd != null; + return Bytes.prefixWith(key, this.keyEnd); + } else if (this.match(Session.SCAN_LT_END)) { + /* + * Less (equal) than `keyEnd`? + * NOTE: don't use BytewiseComparator due to signed byte + */ + if ((this.scanType | Session.SCAN_HASHCODE) != 0) { + return true; + } + assert this.keyEnd != null; + if (this.match(Session.SCAN_LTE_END)) { + // Just compare the prefix, can be there are excess tail + key = Arrays.copyOfRange(key, 0, this.keyEnd.length); + return Bytes.compare(key, this.keyEnd) <= 0; + } else { + return Bytes.compare(key, this.keyEnd) < 0; + } + } else { + assert this.match(Session.SCAN_ANY) || this.match(Session.SCAN_GT_BEGIN) || + this.match( + Session.SCAN_GTE_BEGIN) : "Unknown scan type"; + return true; + } + } + + @Override + public BackendColumn next() { + BackendEntryIterator.checkInterrupted(); + if (!this.hasNext()) { + throw new NoSuchElementException(); + } + BackendColumn col = + BackendColumn.of(this.iter.key(), + this.iter.value()); + if (this.iter.hasNext()) { + gotNext = true; + this.iter.next(); + } else { + gotNext = false; + } + return col; + } + + @Override + public long count() { + long count = 0L; + while (this.hasNext()) { + this.next(); + count++; + BackendEntryIterator.checkInterrupted(); + } + return count; + } + + @Override + public byte[] position() { + return this.position; + } + + @Override + public void close() { + if (this.iter != null) { + this.iter.close(); + } + } + } + + /** + * HstoreSession implement for hstore + */ + private final class HstoreSession extends Session { + + private static final boolean TRANSACTIONAL = true; + private final HgStoreSession graph; + int changedSize = 0; + + public HstoreSession(HugeConfig conf, String graphName) { + setGraphName(graphName); + setConf(conf); + this.graph = hgStoreClient.openSession(graphName); + } + + @Override + public void open() { + this.opened = true; + } + + @Override + public void close() { + this.opened = false; + } + + @Override + public boolean closed() { + return !this.opened; + } + + @Override + public void reset() { + if (this.changedSize != 0) { + this.rollback(); + this.changedSize = 0; + } + } + + /** + * Any change in the session + */ + @Override + public boolean hasChanges() { + return this.changedSize > 0; + } + + /** + * Commit all updates(put/delete) to DB + */ + @Override + public Integer commit() { + if (!this.hasChanges()) { + // TODO: log a message with level WARNING + return 0; + } + int commitSize = this.changedSize; + if (TRANSACTIONAL) { + this.graph.commit(); + } + this.changedSize = 0; + return commitSize; + } + + /** + * Rollback all updates(put/delete) not committed + */ + @Override + public void rollback() { + if (TRANSACTIONAL) { + this.graph.rollback(); + } + this.changedSize = 0; + } + + @Override + public void createTable(String tableName) { + this.graph.createTable(tableName); + } + + @Override + public void dropTable(String tableName) { + this.graph.dropTable(tableName); + } + + @Override + public boolean existsTable(String tableName) { + return this.graph.existsTable(tableName); + } + + @Override + public void truncateTable(String tableName) { + this.graph.deleteTable(tableName); + } + + @Override + public void deleteGraph() { + this.graph.deleteGraph(this.getGraphName()); + } + + @Override + public Pair<byte[], byte[]> keyRange(String table) { + return null; + } + + private void prepare() { + if (!this.hasChanges() && TRANSACTIONAL) { + this.graph.beginTx(); + } + this.changedSize++; + } + + /** + * Add a KV record to a table + */ + @Override + public void put(String table, byte[] ownerKey, byte[] key, + byte[] value) { + prepare(); + this.graph.put(table, HgOwnerKey.of(ownerKey, key), value); + } + + @Override + public synchronized void increase(String table, byte[] ownerKey, + byte[] key, byte[] value) { + prepare(); + this.graph.merge(table, HgOwnerKey.of(ownerKey, key), value); + } + + @Override + public void delete(String table, byte[] ownerKey, byte[] key) { + prepare(); + this.graph.delete(table, HgOwnerKey.of(ownerKey, key)); + } + + @Override + public void deletePrefix(String table, byte[] ownerKey, byte[] key) { + prepare(); + this.graph.deletePrefix(table, HgOwnerKey.of(ownerKey, key)); + } + + /** + * Delete a range of keys from a table + */ + @Override + public void deleteRange(String table, byte[] ownerKeyFrom, + byte[] ownerKeyTo, byte[] keyFrom, + byte[] keyTo) { + prepare(); + this.graph.deleteRange(table, HgOwnerKey.of(ownerKeyFrom, keyFrom), + HgOwnerKey.of(ownerKeyTo, keyTo)); + } + + @Override + public byte[] get(String table, byte[] key) { + return this.graph.get(table, HgOwnerKey.of( + HgStoreClientConst.ALL_PARTITION_OWNER, key)); + } + + @Override + public byte[] get(String table, byte[] ownerKey, byte[] key) { + byte[] values = this.graph.get(table, HgOwnerKey.of(ownerKey, key)); + return values != null ? values : new byte[0]; + } + + @Override + public void beginTx() { + this.graph.beginTx(); + } + + @Override + public BackendColumnIterator scan(String table) { + assert !this.hasChanges(); + return new ColumnIterator<>(table, this.graph.scanIterator(table)); + } + + @Override + public BackendColumnIterator scan(String table, + byte[] conditionQueryToByte) { + assert !this.hasChanges(); + HgKvIterator results = + this.graph.scanIterator(table, conditionQueryToByte); + return new ColumnIterator<>(table, results); + } + + @Override + public BackendColumnIterator scan(String table, byte[] ownerKey, + byte[] prefix) { + assert !this.hasChanges(); + HgKvIterator<HgKvEntry> result = this.graph.scanIterator(table, + HgOwnerKey.of( + ownerKey, + prefix)); + return new ColumnIterator<>(table, result); + } + + @Override + public List<BackendColumnIterator> scan(String table, + List<HgOwnerKey> keys, + int scanType, long limit, + byte[] query) { + HgScanQuery scanQuery = HgScanQuery.prefixOf(table, keys).builder() + .setScanType(scanType) + .setQuery(query) + .setPerKeyLimit(limit).build(); + List<HgKvIterator<HgKvEntry>> scanIterators = + this.graph.scanBatch(scanQuery); + LinkedList<BackendColumnIterator> columnIterators = + new LinkedList<>(); + scanIterators.forEach(item -> { + columnIterators.add( + new ColumnIterator<>(table, item)); + }); + return columnIterators; + } + + @Override + public BackendEntry.BackendIterator<BackendColumnIterator> scan( + String table, + Iterator<HgOwnerKey> keys, + int scanType, Query queryParam, byte[] query) { + ScanOrderType orderType; + switch (queryParam.orderType()) { + case ORDER_NONE: + orderType = ScanOrderType.ORDER_NONE; + break; + case ORDER_WITHIN_VERTEX: + orderType = ScanOrderType.ORDER_WITHIN_VERTEX; + break; + case ORDER_STRICT: + orderType = ScanOrderType.ORDER_STRICT; + break; + default: + throw new RuntimeException("not implement"); + } + HgScanQuery scanQuery = HgScanQuery.prefixIteratorOf(table, keys) + .builder() + .setScanType(scanType) + .setQuery(query) + .setPerKeyMax(queryParam.limit()) + .setOrderType(orderType) + .setOnlyKey( + !queryParam.withProperties()) + .setSkipDegree( + queryParam.skipDegree()) + .build(); + KvCloseableIterator<HgKvIterator<HgKvEntry>> scanIterators = + this.graph.scanBatch2(scanQuery); + return new BackendEntry.BackendIterator<BackendColumnIterator>() { + @Override + public void close() { + scanIterators.close(); + } + + @Override + public byte[] position() { + throw new NotImplementedException(); + } + + @Override + public boolean hasNext() { + return scanIterators.hasNext(); + } + + @Override + public BackendColumnIterator next() { + return new ColumnIterator<HgKvIterator>(table, + scanIterators.next()); + } + }; + } + + @Override + public BackendColumnIterator scan(String table, byte[] ownerKeyFrom, + byte[] ownerKeyTo, + byte[] keyFrom, byte[] keyTo, + int scanType) { + assert !this.hasChanges(); + HgKvIterator result = this.graph.scanIterator(table, HgOwnerKey.of( + ownerKeyFrom, keyFrom), + HgOwnerKey.of( + ownerKeyTo, + keyTo), 0, + scanType, + null); + return new ColumnIterator<>(table, result, keyFrom, + keyTo, scanType); + } + + @Override + public BackendColumnIterator scan(String table, byte[] ownerKeyFrom, + byte[] ownerKeyTo, + byte[] keyFrom, byte[] keyTo, + int scanType, byte[] query) { + assert !this.hasChanges(); + HgKvIterator<HgKvEntry> result = this.graph.scanIterator(table, + HgOwnerKey.of( + ownerKeyFrom, + keyFrom), + HgOwnerKey.of( + ownerKeyTo, + keyTo), + 0, + scanType, + query); + return new ColumnIterator<>(table, result, keyFrom, keyTo, + scanType); + } + + @Override + public BackendColumnIterator scan(String table, byte[] ownerKeyFrom, + byte[] ownerKeyTo, + byte[] keyFrom, byte[] keyTo, + int scanType, byte[] query, + byte[] position) { + assert !this.hasChanges(); + HgKvIterator<HgKvEntry> result = this.graph.scanIterator(table, + HgOwnerKey.of( + ownerKeyFrom, + keyFrom), + HgOwnerKey.of( + ownerKeyTo, + keyTo), + 0, + scanType, + query); + result.seek(position); + return new ColumnIterator<>(table, result, keyFrom, keyTo, + scanType); + } + + @Override + public BackendColumnIterator scan(String table, int codeFrom, + int codeTo, int scanType, + byte[] query) { + assert !this.hasChanges(); + HgKvIterator<HgKvEntry> iterator = + this.graph.scanIterator(table, codeFrom, codeTo, 256, + new byte[0]); + return new ColumnIterator<>(table, iterator, new byte[0], + new byte[0], scanType); + } + + @Override + public BackendColumnIterator scan(String table, int codeFrom, + int codeTo, int scanType, + byte[] query, byte[] position) { + assert !this.hasChanges(); + HgKvIterator<HgKvEntry> iterator = + this.graph.scanIterator(table, codeFrom, codeTo, 256, + new byte[0]); + iterator.seek(position); + return new ColumnIterator<>(table, iterator, new byte[0], + new byte[0], scanType); + } + + @Override + public BackendColumnIterator getWithBatch(String table, + List<HgOwnerKey> keys) { + assert !this.hasChanges(); + HgKvIterator<HgKvEntry> kvIterator = + this.graph.batchPrefix(table, keys); + return new ColumnIterator<>(table, kvIterator); + } + + @Override + public void merge(String table, byte[] ownerKey, byte[] key, + byte[] value) { + prepare(); + this.graph.merge(table, HgOwnerKey.of(ownerKey, key), value); + } + + @Override + public void setMode(GraphMode mode) { + // no need to set pd mode + } + + @Override + public void truncate() throws Exception { + this.graph.truncate(); + HstoreSessionsImpl.getDefaultPdClient() + .resetIdByKey(this.getGraphName()); + } + + @Override + public int getActiveStoreSize() { + try { + return defaultPdClient.getActiveStores().size(); + } catch (PDException ignore) { + } + return 0; + } + } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreStore.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreStore.java new file mode 100644 index 000000000..1127d122e --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreStore.java @@ -0,0 +1,825 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.backend.query.IdPrefixQuery; +import org.apache.hugegraph.backend.query.IdQuery; +import org.apache.hugegraph.backend.query.Query; +import org.apache.hugegraph.backend.serializer.BinaryBackendEntry; +import org.apache.hugegraph.backend.serializer.BytesBuffer; +import org.apache.hugegraph.backend.serializer.MergeIterator; +import org.apache.hugegraph.backend.store.AbstractBackendStore; +import org.apache.hugegraph.backend.store.BackendAction; +import org.apache.hugegraph.backend.store.BackendEntry; +import org.apache.hugegraph.backend.store.BackendFeatures; +import org.apache.hugegraph.backend.store.BackendMutation; +import org.apache.hugegraph.backend.store.BackendStoreProvider; +import org.apache.hugegraph.backend.store.BackendTable; +import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Session; +import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.iterator.CIter; +import org.apache.hugegraph.type.HugeTableType; +import org.apache.hugegraph.type.HugeType; +import org.apache.hugegraph.type.define.Action; +import org.apache.hugegraph.type.define.GraphMode; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +import com.google.common.collect.ImmutableSet; + +public abstract class HstoreStore extends AbstractBackendStore<Session> { + + private static final Logger LOG = Log.logger(HstoreStore.class); + + private static final Set<HugeType> INDEX_TYPES = ImmutableSet.of( + HugeType.SECONDARY_INDEX, HugeType.VERTEX_LABEL_INDEX, + HugeType.EDGE_LABEL_INDEX, HugeType.RANGE_INT_INDEX, + HugeType.RANGE_FLOAT_INDEX, HugeType.RANGE_LONG_INDEX, + HugeType.RANGE_DOUBLE_INDEX, HugeType.SEARCH_INDEX, + HugeType.SHARD_INDEX, HugeType.UNIQUE_INDEX + ); + + private static final BackendFeatures FEATURES = new HstoreFeatures(); + private final String store, namespace; + + private final BackendStoreProvider provider; + private final Map<Integer, HstoreTable> tables; + private final ReadWriteLock storeLock; + private boolean isGraphStore; + private HstoreSessions sessions; + + public HstoreStore(final BackendStoreProvider provider, + String namespace, String store) { + this.tables = new HashMap<>(); + this.provider = provider; + this.namespace = namespace; + this.store = store; + this.sessions = null; + this.storeLock = new ReentrantReadWriteLock(); + this.registerMetaHandlers(); + LOG.debug("Store loaded: {}", store); + } + + private void registerMetaHandlers() { + Supplier<List<HstoreSessions>> dbsGet = () -> { + List<HstoreSessions> dbs = new ArrayList<>(); + dbs.add(this.sessions); + return dbs; + }; + this.registerMetaHandler("metrics", (session, meta, args) -> { + HstoreMetrics metrics = new HstoreMetrics(dbsGet.get(), session); + return metrics.metrics(); + }); + this.registerMetaHandler("mode", (session, meta, args) -> { + E.checkArgument(args.length == 1, + "The args count of %s must be 1", meta); + session.setMode((GraphMode) args[0]); + return null; + }); + } + + protected void registerTableManager(HugeTableType type, HstoreTable table) { + this.tables.put((int) type.code(), table); + } + + @Override + protected final HstoreTable table(HugeType type) { + assert type != null; + HugeTableType table; + switch (type) { + case VERTEX: + table = HugeTableType.VERTEX; + break; + case EDGE_OUT: + table = HugeTableType.OUT_EDGE; + break; + case EDGE_IN: + table = HugeTableType.IN_EDGE; + break; + case OLAP: + table = HugeTableType.OLAP_TABLE; + break; + case TASK: + table = HugeTableType.TASK_INFO_TABLE; + break; + case SERVER: + table = HugeTableType.SERVER_INFO_TABLE; + break; + case SEARCH_INDEX: + case SHARD_INDEX: + case SECONDARY_INDEX: + case RANGE_INT_INDEX: + case RANGE_LONG_INDEX: + case RANGE_FLOAT_INDEX: + case RANGE_DOUBLE_INDEX: + case EDGE_LABEL_INDEX: + case VERTEX_LABEL_INDEX: + case UNIQUE_INDEX: + table = HugeTableType.ALL_INDEX_TABLE; + break; + default: + throw new AssertionError(String.format( + "Invalid type: %s", type)); + } + return this.tables.get((int) table.code()); + } + + protected List<String> tableNames() { + return this.tables.values().stream() + .map(BackendTable::table) + .collect(Collectors.toList()); + } + + @Override + protected Session session(HugeType type) { + this.checkOpened(); + return this.sessions.session(); + } + + public String namespace() { + return this.namespace; + } + + @Override + public String store() { + return this.store; + } + + @Override + public String database() { + return this.namespace; + } + + @Override + public BackendStoreProvider provider() { + return this.provider; + } + + @Override + public BackendFeatures features() { + return FEATURES; + } + + @Override + public synchronized void open(HugeConfig config) { + E.checkNotNull(config, "config"); + + if (this.sessions == null) { + this.sessions = new HstoreSessionsImpl(config, this.namespace, + this.store); + } + + String graphStore = config.get(CoreOptions.STORE_GRAPH); + this.isGraphStore = this.store.equals(graphStore); + assert this.sessions != null; + if (!this.sessions.closed()) { + LOG.debug("Store {} has been opened before", this.store); + this.sessions.useSession(); + return; + } + + try { + // NOTE: won't throw error even if connection refused + this.sessions.open(); + } catch (Exception e) { + LOG.error("Failed to open Hstore '{}':{}", this.store, e); + } + this.sessions.session(); + LOG.debug("Store opened: {}", this.store); + } + + @Override + public void close() { + this.checkOpened(); + this.sessions.close(); + + LOG.debug("Store closed: {}", this.store); + } + + @Override + public boolean opened() { + this.checkConnectionOpened(); + return this.sessions.session().opened(); + } + + @Override + public void mutate(BackendMutation mutation) { + Session session = this.sessions.session(); + assert session.opened(); + Map<HugeType, Map<Id, List<BackendAction>>> mutations = mutation.mutations(); + Set<Map.Entry<HugeType, Map<Id, List<BackendAction>>>> entries = mutations.entrySet(); + for (Map.Entry<HugeType, Map<Id, List<BackendAction>>> entry : entries) { + HugeType key = entry.getKey(); + // in order to obtain the owner efficiently, special for edge + boolean isEdge = key.isEdge(); + HstoreTable hTable = this.table(key); + Map<Id, List<BackendAction>> table = entry.getValue(); + Collection<List<BackendAction>> values = table.values(); + for (List<BackendAction> items : values) { + for (int i = 0; i < items.size(); i++) { + BackendAction item = items.get(i); + // set to ArrayList, use index to get item + this.mutate(session, item, hTable, isEdge); + } + } + } + } + + private void mutate(Session session, BackendAction item, + HstoreTable hTable, boolean isEdge) { + BackendEntry entry = item.entry(); + HstoreTable table; + if (!entry.olap()) { + // Oltp table + table = hTable; + } else { + if (entry.type().isIndex()) { + // Olap index + table = this.table(entry.type()); + } else { + // Olap vertex + table = this.table(HugeType.OLAP); + } + session = this.session(HugeType.OLAP); + } + + if (item.action().code() == Action.INSERT.code()) { + table.insert(session, entry, isEdge); + } else { + if (item.action().code() == Action.APPEND.code()) { + table.append(session, entry); + } else { + switch (item.action()) { + case DELETE: + table.delete(session, entry); + break; + case ELIMINATE: + table.eliminate(session, entry); + break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(session, entry); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(session, entry); + break; + default: + throw new AssertionError(String.format( + "Unsupported mutate action: %s", + item.action())); + } + } + } + } + + private HstoreTable getTableByQuery(Query query) { + HugeType tableType = HstoreTable.tableType(query); + HstoreTable table; + if (query.olap()) { + if (query.resultType().isIndex()) { + // Any index type is ok here + table = this.table(HugeType.SECONDARY_INDEX); + } else { + table = this.table(HugeType.OLAP); + } + } else { + table = this.table(tableType); + } + return table; + } + + @Override + public Iterator<BackendEntry> query(Query query) { + Lock readLock = this.storeLock.readLock(); + readLock.lock(); + try { + this.checkOpened(); + Session session = this.sessions.session(); + HstoreTable table = getTableByQuery(query); + Iterator<BackendEntry> entries = table.query(session, query); + // Merge olap results as needed + entries = getBackendEntryIterator(entries, query); + return entries; + } finally { + readLock.unlock(); + } + } + + // TODO: uncomment later - sub edge labels + //@Override + //public Iterator<Iterator<BackendEntry>> query(Iterator<Query> queries, + // Function<Query, Query> queryWriter, + // HugeGraph hugeGraph) { + // if (queries == null || !queries.hasNext()) { + // return Collections.emptyIterator(); + // } + // + // class QueryWrapper implements Iterator<IdPrefixQuery> { + // Query first; + // final Iterator<Query> queries; + // Iterator<Id> subEls; + // Query preQuery; + // Iterator<IdPrefixQuery> queryListIterator; + // + // QueryWrapper(Iterator<Query> queries, Query first) { + // this.queries = queries; + // this.first = first; + // } + // + // @Override + // public boolean hasNext() { + // return first != null || (this.subEls != null && this.subEls.hasNext()) + // || (queryListIterator != null && queryListIterator.hasNext()) || + // queries.hasNext(); + // } + // + // @Override + // public IdPrefixQuery next() { + // if (queryListIterator != null && queryListIterator.hasNext()) { + // return queryListIterator.next(); + // } + // + // Query q; + // if (first != null) { + // q = first; + // preQuery = q.copy(); + // first = null; + // } else { + // if (this.subEls == null || !this.subEls.hasNext()) { + // q = queries.next(); + // preQuery = q.copy(); + // } else { + // q = preQuery.copy(); + // } + // } + // + // assert q instanceof ConditionQuery; + // ConditionQuery cq = (ConditionQuery) q; + // ConditionQuery originQuery = (ConditionQuery) q.copy(); + // + // List<IdPrefixQuery> queryList = Lists.newArrayList(); + // if (hugeGraph != null) { + // for (ConditionQuery conditionQuery : + // ConditionQueryFlatten.flatten(cq)) { + // Id label = conditionQuery.condition(HugeKeys.LABEL); + // /* 父类型 + sortKeys: g.V("V.id").outE("parentLabel").has + // ("sortKey","value")转成 所有子类型 + sortKeys*/ + // if ((this.subEls == null || + // !this.subEls.hasNext()) && label != null && + // hugeGraph.edgeLabel(label).isFather() && + // conditionQuery.condition(HugeKeys.SUB_LABEL) == + // null && + // conditionQuery.condition(HugeKeys.OWNER_VERTEX) != + // null && + // conditionQuery.condition(HugeKeys.DIRECTION) != + // null && + // matchEdgeSortKeys(conditionQuery, false, + // hugeGraph)) { + // this.subEls = + // getSubLabelsOfParentEl( + // hugeGraph.edgeLabels(), + // label); + // } + // + // if (this.subEls != null && + // this.subEls.hasNext()) { + // conditionQuery.eq(HugeKeys.SUB_LABEL, + // subEls.next()); + // } + // + // HugeType hugeType = conditionQuery.resultType(); + // if (hugeType != null && hugeType.isEdge() && + // !conditionQuery.conditions().isEmpty()) { + // IdPrefixQuery idPrefixQuery = + // (IdPrefixQuery) queryWriter.apply( + // conditionQuery); + // idPrefixQuery.setOriginQuery(originQuery); + // queryList.add(idPrefixQuery); + // } + // } + // + // queryListIterator = queryList.iterator(); + // if (queryListIterator.hasNext()) { + // return queryListIterator.next(); + // } + // } + // + // Id ownerId = cq.condition(HugeKeys.OWNER_VERTEX); + // assert ownerId != null; + // BytesBuffer buffer = + // BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID); + // buffer.writeId(ownerId); + // return new IdPrefixQuery(cq, new BinaryBackendEntry.BinaryId( + // buffer.bytes(), ownerId)); + // } + // + // private boolean matchEdgeSortKeys(ConditionQuery query, + // boolean matchAll, + // HugeGraph graph) { + // assert query.resultType().isEdge(); + // Id label = query.condition(HugeKeys.LABEL); + // if (label == null) { + // return false; + // } + // List<Id> sortKeys = graph.edgeLabel(label).sortKeys(); + // if (sortKeys.isEmpty()) { + // return false; + // } + // Set<Id> queryKeys = query.userpropKeys(); + // for (int i = sortKeys.size(); i > 0; i--) { + // List<Id> subFields = sortKeys.subList(0, i); + // if (queryKeys.containsAll(subFields)) { + // if (queryKeys.size() == subFields.size() || !matchAll) { + // /* + // * Return true if: + // * matchAll=true and all queryKeys are in sortKeys + // * or + // * partial queryKeys are in sortKeys + // */ + // return true; + // } + // } + // } + // return false; + // } + // } + // Query first = queries.next(); + // List<HugeType> typeList = getHugeTypes(first); + // QueryWrapper idPrefixQueries = new QueryWrapper(queries, first); + // + // return query(typeList, idPrefixQueries); + //} + + //private Iterator<Id> getSubLabelsOfParentEl(Collection<EdgeLabel> allEls, + // Id label) { + // List<Id> list = new ArrayList<>(); + // for (EdgeLabel el : allEls) { + // if (el.edgeLabelType().sub() && el.fatherId().equals(label)) { + // list.add(el.id()); + // } + // } + // return list.iterator(); + //} + + public List<CIter<BackendEntry>> query(List<HugeType> typeList, + List<IdPrefixQuery> queries) { + Lock readLock = this.storeLock.readLock(); + readLock.lock(); + LinkedList<CIter<BackendEntry>> results = new LinkedList<>(); + try { + this.checkOpened(); + Session session = this.sessions.session(); + E.checkState(!CollectionUtils.isEmpty(queries) && + !CollectionUtils.isEmpty(typeList), + "Please check query list or type list."); + HstoreTable table = null; + StringBuilder builder = new StringBuilder(); + for (HugeType type : typeList) { + builder.append((table = this.table(type)).table()).append(","); + } + List<Iterator<BackendEntry>> iteratorList = + table.query(session, queries, + builder.substring(0, builder.length() - 1)); + for (int i = 0; i < iteratorList.size(); i++) { + Iterator<BackendEntry> entries = iteratorList.get(i); + // Merge olap results as needed + Query query = queries.get(i); + entries = getBackendEntryIterator(entries, query); + if (entries instanceof CIter) { + results.add((CIter) entries); + } + } + return results; + } finally { + readLock.unlock(); + } + } + + public Iterator<Iterator<BackendEntry>> query(List<HugeType> typeList, + Iterator<IdPrefixQuery> queries) { + Lock readLock = this.storeLock.readLock(); + readLock.lock(); + try { + this.checkOpened(); + Session session = this.sessions.session(); + E.checkState(queries.hasNext() && + !CollectionUtils.isEmpty(typeList), + "Please check query list or type list."); + HstoreTable table = null; + StringBuilder builder = new StringBuilder(); + for (HugeType type : typeList) { + builder.append((table = this.table(type)).table()).append(","); + } + + Iterator<Iterator<BackendEntry>> iterators = + table.query(session, queries, + builder.substring(0, builder.length() - 1)); + + return iterators; + } finally { + readLock.unlock(); + } + } + + private Iterator<BackendEntry> getBackendEntryIterator( + Iterator<BackendEntry> entries, + Query query) { + HstoreTable table; + Set<Id> olapPks = query.olapPks(); + if (this.isGraphStore && !olapPks.isEmpty()) { + List<Iterator<BackendEntry>> iterators = new ArrayList<>(); + for (Id pk : olapPks) { + // 构造olap表查询query condition + Query q = this.constructOlapQueryCondition(pk, query); + table = this.table(HugeType.OLAP); + iterators.add(table.queryOlap(this.session(HugeType.OLAP), q)); + } + entries = new MergeIterator<>(entries, iterators, + BackendEntry::mergeable); + } + return entries; + } + + + /** + * 重新构造 查询olap表 query + * 由于 olap合并成一张表, 在写入olap数据, key在后面增加了pk + * 所以在此进行查询的时候,需要重新构造pk前缀 + * 写入参考 BinarySerializer.writeOlapVertex + * + * @param pk + * @param query + * @return + */ + private Query constructOlapQueryCondition(Id pk, Query query) { + if (query instanceof IdQuery && !CollectionUtils.isEmpty((query).ids())) { + IdQuery q = (IdQuery) query.copy(); + Iterator<Id> iterator = q.ids().iterator(); + LinkedHashSet<Id> linkedHashSet = new LinkedHashSet<>(); + while (iterator.hasNext()) { + Id id = iterator.next(); + if (id instanceof BinaryBackendEntry.BinaryId) { + id = ((BinaryBackendEntry.BinaryId) id).origin(); + } + + // create binary id + BytesBuffer buffer = + BytesBuffer.allocate(1 + pk.length() + 1 + id.length()); + buffer.writeId(pk); + id = new BinaryBackendEntry.BinaryId( + buffer.writeId(id).bytes(), id); + linkedHashSet.add(id); + } + q.resetIds(); + q.query(linkedHashSet); + return q; + } else { + // create binary id + BytesBuffer buffer = BytesBuffer.allocate(1 + pk.length()); + pk = new BinaryBackendEntry.BinaryId( + buffer.writeId(pk).bytes(), pk); + + IdPrefixQuery idPrefixQuery = new IdPrefixQuery(HugeType.OLAP, pk); + return idPrefixQuery; + } + } + + @Override + public Number queryNumber(Query query) { + this.checkOpened(); + + Session session = this.sessions.session(); + HstoreTable table = this.table(HstoreTable.tableType(query)); + return table.queryNumber(session, query); + } + + @Override + public synchronized void init() { + Lock writeLock = this.storeLock.writeLock(); + writeLock.lock(); + try { + // Create tables with main disk + this.sessions.createTable(this.tableNames().toArray(new String[0])); + LOG.debug("Store initialized: {}", this.store); + } finally { + writeLock.unlock(); + } + } + + @Override + public void clear(boolean clearSpace) { + Lock writeLock = this.storeLock.writeLock(); + writeLock.lock(); + try { + // Drop tables with main disk + this.sessions.dropTable(this.tableNames().toArray(new String[0])); + if (clearSpace) { + this.sessions.clear(); + } + LOG.debug("Store cleared: {}", this.store); + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean initialized() { + return true; + } + + @Override + public void truncate() { + try { + this.sessions.session().truncate(); + } catch (Exception e) { + LOG.error("Store truncated failed", e); + return; + } + LOG.debug("Store truncated: {}", this.store); + } + + @Override + public void beginTx() { + this.sessions.session().beginTx(); + } + + @Override + public void commitTx() { + this.checkOpened(); + Session session = this.sessions.session(); + session.commit(); + } + + @Override + public void rollbackTx() { + this.checkOpened(); + Session session = this.sessions.session(); + session.rollback(); + } + + private void checkConnectionOpened() { + } + + @Override + public Id nextId(HugeType type) { + long counter = 0L; + counter = this.getCounter(type); + E.checkState(counter != 0L, "Please check whether '%s' is OK", + this.provider().type()); + return IdGenerator.of(counter); + } + + @Override + public void setCounterLowest(HugeType type, long lowest) { + this.increaseCounter(type, lowest); + } + + /***************************** Store defines *****************************/ + + public static class HstoreSchemaStore extends HstoreStore { + + public HstoreSchemaStore(BackendStoreProvider provider, String namespace, String store) { + super(provider, namespace, store); + } + + @Override + public boolean isSchemaStore() { + return true; + } + + @Override + public void increaseCounter(HugeType type, long num) { + throw new UnsupportedOperationException( + "HstoreSchemaStore.increaseCounter()"); + } + + @Override + public long getCounter(HugeType type) { + throw new UnsupportedOperationException( + "HstoreSchemaStore.getCounter()"); + } + } + + public static class HstoreGraphStore extends HstoreStore { + + public HstoreGraphStore(BackendStoreProvider provider, + String namespace, String store) { + super(provider, namespace, store); + + registerTableManager(HugeTableType.VERTEX, + new HstoreTables.Vertex(store)); + registerTableManager(HugeTableType.OUT_EDGE, + HstoreTables.Edge.out(store)); + registerTableManager(HugeTableType.IN_EDGE, + HstoreTables.Edge.in(store)); + registerTableManager(HugeTableType.ALL_INDEX_TABLE, + new HstoreTables.IndexTable(store)); + registerTableManager(HugeTableType.OLAP_TABLE, + new HstoreTables.OlapTable(store)); + registerTableManager(HugeTableType.TASK_INFO_TABLE, + new HstoreTables.TaskInfo(store)); + registerTableManager(HugeTableType.SERVER_INFO_TABLE, + new HstoreTables.ServerInfo(store)); + } + + @Override + public boolean isSchemaStore() { + return false; + } + + @Override + public Id nextId(HugeType type) { + throw new UnsupportedOperationException( + "HstoreGraphStore.nextId()"); + } + + @Override + public void increaseCounter(HugeType type, long num) { + throw new UnsupportedOperationException( + "HstoreGraphStore.increaseCounter()"); + } + + @Override + public long getCounter(HugeType type) { + throw new UnsupportedOperationException( + "HstoreGraphStore.getCounter()"); + } + + @Override + public void createOlapTable(Id pkId) { + HstoreTable table = new HstoreTables.OlapTable(this.store()); + LOG.info("Hstore create olap table {}", table.table()); + super.sessions.createTable(table.table()); + LOG.info("Hstore finish create olap table"); + registerTableManager(HugeTableType.OLAP_TABLE, table); + LOG.info("OLAP table {} has been created", table.table()); + } + + @Override + public void checkAndRegisterOlapTable(Id pkId) { + HstoreTable table = new HstoreTables.OlapTable(this.store()); + if (!super.sessions.existsTable(table.table())) { + LOG.error("Found exception: Table '{}' doesn't exist, we'll " + + "recreate it now. Please carefully check the recent" + + "operation in server and computer, then ensure the " + + "integrity of store file.", table.table()); + this.createOlapTable(pkId); + } else { + registerTableManager(HugeTableType.OLAP_TABLE, table); + } + } + + @Override + public void clearOlapTable(Id pkId) { + } + + @Override + public void removeOlapTable(Id pkId) { + } + + @Override + public boolean existOlapTable(Id pkId) { + String tableName = this.olapTableName(pkId); + return super.sessions.existsTable(tableName); + } + } + + @Override + public String storedVersion() { + return "1.13"; + } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java new file mode 100755 index 000000000..39e24a1d9 --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java @@ -0,0 +1,732 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.backend.id.EdgeId; +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.page.PageState; +import org.apache.hugegraph.backend.query.Aggregate; +import org.apache.hugegraph.backend.query.Aggregate.AggregateFunc; +import org.apache.hugegraph.backend.query.Condition; +import org.apache.hugegraph.backend.query.Condition.Relation; +import org.apache.hugegraph.backend.query.ConditionQuery; +import org.apache.hugegraph.backend.query.IdPrefixQuery; +import org.apache.hugegraph.backend.query.IdRangeQuery; +import org.apache.hugegraph.backend.query.Query; +import org.apache.hugegraph.backend.serializer.BinaryBackendEntry; +import org.apache.hugegraph.backend.serializer.BinaryEntryIterator; +import org.apache.hugegraph.backend.store.BackendEntry; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; +import org.apache.hugegraph.backend.store.BackendEntryIterator; +import org.apache.hugegraph.backend.store.BackendTable; +import org.apache.hugegraph.backend.store.Shard; +import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Countable; +import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Session; +import org.apache.hugegraph.exception.NotSupportException; +import org.apache.hugegraph.pd.client.PDClient; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.pd.grpc.Metapb; +import org.apache.hugegraph.store.HgOwnerKey; +import org.apache.hugegraph.store.client.util.HgStoreClientConst; +import org.apache.hugegraph.type.HugeType; +import org.apache.hugegraph.type.define.HugeKeys; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; +import org.apache.hugegraph.util.StringEncoding; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import org.slf4j.Logger; + +public class HstoreTable extends BackendTable<Session, BackendEntry> { + + private static final Logger LOG = Log.logger(HstoreStore.class); + + private final HstoreShardSplitter shardSpliter; + Function<BackendEntry, byte[]> ownerDelegate = (entry) -> getOwner(entry); + Function<Id, byte[]> ownerByIdDelegate = (id) -> getOwnerId(id); + BiFunction<HugeType, Id, byte[]> ownerByQueryDelegate = + (type, id) -> getOwnerId(type, id); + Supplier<byte[]> ownerScanDelegate = + () -> HgStoreClientConst.ALL_PARTITION_OWNER; + + public HstoreTable(String database, String table) { + super(String.format("%s+%s", database, table)); + this.shardSpliter = new HstoreShardSplitter(this.table()); + } + + public static ConditionQuery removeDirectionCondition(ConditionQuery conditionQuery) { + Collection<Condition> conditions = conditionQuery.conditions(); + List<Condition> newConditions = new ArrayList<>(); + for (Condition condition : conditions) { + if (!direction(condition)) { + newConditions.add(condition); + } + } + if (newConditions.size() > 0) { + conditionQuery.resetConditions(newConditions); + return conditionQuery; + } else { + return null; + } + } + + private static boolean direction(Condition condition) { + boolean direction = true; + List<? extends Relation> relations = condition.relations(); + for (Relation r : relations) { + if (!r.key().equals(HugeKeys.DIRECTION)) { + direction = false; + break; + } + } + return direction; + } + + protected static BackendEntryIterator newEntryIterator( + BackendColumnIterator cols, Query query) { + return new BinaryEntryIterator<>(cols, query, (entry, col) -> { + if (entry == null || !entry.belongToMe(col)) { + HugeType type = query.resultType(); + // NOTE: only support BinaryBackendEntry currently + entry = new BinaryBackendEntry(type, col.name); + } + entry.columns(col); + return entry; + }); + } + + protected static BackendEntryIterator newEntryIteratorOlap( + BackendColumnIterator cols, Query query, boolean isOlap) { + return new BinaryEntryIterator<>(cols, query, (entry, col) -> { + if (entry == null || !entry.belongToMe(col)) { + HugeType type = query.resultType(); + // NOTE: only support BinaryBackendEntry currently + entry = new BinaryBackendEntry(type, col.name, false, isOlap); + } + entry.columns(col); + return entry; + }); + } + + public static String bytes2String(byte[] bytes) { + StringBuilder result = new StringBuilder(); + for (byte b : bytes) { + String st = String.format("%02x", b); + result.append(st); + } + return result.toString(); + } + + @Override + protected void registerMetaHandlers() { + this.registerMetaHandler("splits", (session, meta, args) -> { + E.checkArgument(args.length == 1, + "The args count of %s must be 1", meta); + long splitSize = (long) args[0]; + return this.shardSpliter.getSplits(session, splitSize); + }); + } + + @Override + public void init(Session session) { + // pass + } + + @Override + public void clear(Session session) { + // pass + } + + public boolean isOlap() { + return false; + } + + private byte[] getOwner(BackendEntry entry) { + if (entry == null) { + return HgStoreClientConst.ALL_PARTITION_OWNER; + } + Id id = entry.type().isIndex() ? entry.id() : entry.originId(); + return getOwnerId(id); + } + + public Supplier<byte[]> getOwnerScanDelegate() { + return ownerScanDelegate; + } + + public byte[] getInsertEdgeOwner(BackendEntry entry) { + Id id = entry.originId(); + id = ((EdgeId) id).ownerVertexId(); + return id.asBytes(); + } + + public byte[] getInsertOwner(BackendEntry entry) { + // 为适应label索引散列,不聚焦在一个分区 + if (entry.type().isLabelIndex() && (entry.columns().size() == 1)) { + Iterator<BackendColumn> iterator = entry.columns().iterator(); + while (iterator.hasNext()) { + BackendColumn next = iterator.next(); + return next.name; + } + } + + Id id = entry.type().isIndex() ? entry.id() : entry.originId(); + return getOwnerId(id); + } + + /** + * 返回Id所属的点ID + * + * @param id + * @return + */ + protected byte[] getOwnerId(Id id) { + if (id instanceof BinaryBackendEntry.BinaryId) { + id = ((BinaryBackendEntry.BinaryId) id).origin(); + } + if (id != null && id.edge()) { + id = ((EdgeId) id).ownerVertexId(); + } + return id != null ? id.asBytes() : + HgStoreClientConst.ALL_PARTITION_OWNER; + } + + /** + * 返回Id所属的点ID + * + * @param id + * @return + */ + protected byte[] getOwnerId(HugeType type, Id id) { + if (type.equals(HugeType.VERTEX) || type.equals(HugeType.EDGE) || + type.equals(HugeType.EDGE_OUT) || type.equals(HugeType.EDGE_IN) || + type.equals(HugeType.COUNTER)) { + return getOwnerId(id); + } else { + return HgStoreClientConst.ALL_PARTITION_OWNER; + } + } + + @Override + public void insert(Session session, BackendEntry entry) { + byte[] owner = entry.type().isEdge() ? getInsertEdgeOwner(entry) : getInsertOwner(entry); + ArrayList<BackendColumn> columns = new ArrayList<>(entry.columns()); + for (int i = 0; i < columns.size(); i++) { + BackendColumn col = columns.get(i); + session.put(this.table(), owner, col.name, col.value); + } + } + + public void insert(Session session, BackendEntry entry, boolean isEdge) { + byte[] owner = isEdge ? getInsertEdgeOwner(entry) : getInsertOwner(entry); + ArrayList<BackendColumn> columns = new ArrayList<>(entry.columns()); + for (int i = 0; i < columns.size(); i++) { + BackendColumn col = columns.get(i); + session.put(this.table(), owner, col.name, col.value); + } + } + + @Override + public void delete(Session session, BackendEntry entry) { + byte[] ownerKey = ownerDelegate.apply(entry); + if (entry.columns().isEmpty()) { + byte[] idBytes = entry.id().asBytes(); + // LOG.debug("Delete from {} with owner {}, id: {}", + // this.table(), bytes2String(ownerKey), idBytes); + session.delete(this.table(), ownerKey, idBytes); + } else { + for (BackendColumn col : entry.columns()) { + // LOG.debug("Delete from {} with owner {}, id: {}", + // this.table(), bytes2String(ownerKey), + // bytes2String(col.name)); + assert entry.belongToMe(col) : entry; + session.delete(this.table(), ownerKey, col.name); + } + } + } + + @Override + public void append(Session session, BackendEntry entry) { + assert entry.columns().size() == 1; + this.insert(session, entry); + } + + @Override + public void eliminate(Session session, BackendEntry entry) { + assert entry.columns().size() == 1; + this.delete(session, entry); + } + + @Override + public boolean queryExist(Session session, BackendEntry entry) { + Id id = entry.id(); + try (BackendColumnIterator iter = this.queryById(session, id)) { + return iter.hasNext(); + } + } + + @Override + public Number queryNumber(Session session, Query query) { + Aggregate aggregate = query.aggregateNotNull(); + if (aggregate.func() != AggregateFunc.COUNT) { + throw new NotSupportException(aggregate.toString()); + } + + assert aggregate.func() == AggregateFunc.COUNT; + assert query.noLimit(); + Iterator<BackendColumn> results = this.queryBy(session, query); + if (results instanceof Countable) { + return ((Countable) results).count(); + } + return IteratorUtils.count(results); + } + + @Override + public Iterator<BackendEntry> query(Session session, Query query) { + if (query.limit() == 0L && !query.noLimit()) { + // LOG.debug("Return empty result(limit=0) for query {}", query); + return Collections.emptyIterator(); + } + return newEntryIterator(this.queryBy(session, query), query); + } + + @Override + public Iterator<BackendEntry> queryOlap(Session session, Query query) { + if (query.limit() == 0L && !query.noLimit()) { + // LOG.debug("Return empty result(limit=0) for query {}", query); + return Collections.emptyIterator(); + } + return newEntryIteratorOlap(this.queryBy(session, query), query, true); + } + + public List<Iterator<BackendEntry>> query(Session session, + List<IdPrefixQuery> queries, + String tableName) { + List<BackendColumnIterator> queryByPrefixList = + this.queryByPrefixList(session, queries, tableName); + LinkedList<Iterator<BackendEntry>> iterators = new LinkedList<>(); + for (int i = 0; i < queryByPrefixList.size(); i++) { + IdPrefixQuery q = queries.get(i).copy(); + q.capacity(Query.NO_CAPACITY); + q.limit(Query.NO_LIMIT); + BackendEntryIterator iterator = + newEntryIterator(queryByPrefixList.get(i), q); + iterators.add(iterator); + } + return iterators; + } + + public BackendEntry.BackendIterator<Iterator<BackendEntry>> query(Session session, + Iterator<IdPrefixQuery> queries, + String tableName) { + final IdPrefixQuery[] first = {queries.next()}; + int type = first[0].withProperties() ? 0 : Session.SCAN_KEY_ONLY; + + IdPrefixQuery queryTmpl = first[0].copy(); + queryTmpl.capacity(Query.NO_CAPACITY); + queryTmpl.limit(Query.NO_LIMIT); + + ConditionQuery originQuery = (ConditionQuery) first[0].originQuery(); + if (originQuery != null) { + originQuery = prepareConditionQueryList(originQuery); + } + byte[] queryBytes = originQuery == null ? null : originQuery.bytes(); + + BackendEntry.BackendIterator<BackendColumnIterator> it + = session.scan(tableName, new Iterator<HgOwnerKey>() { + @Override + public boolean hasNext() { + if (first[0] != null) { + return true; + } + return queries.hasNext(); + } + + @Override + public HgOwnerKey next() { + IdPrefixQuery query = first[0] != null ? first[0] : queries.next(); + first[0] = null; + byte[] prefix = ownerByQueryDelegate.apply(query.resultType(), + query.prefix()); + return HgOwnerKey.of(prefix, query.prefix().asBytes()); + } + }, type, first[0], queryBytes); + return new BackendEntry.BackendIterator<Iterator<BackendEntry>>() { + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public Iterator<BackendEntry> next() { + BackendEntryIterator iterator = newEntryIterator(it.next(), queryTmpl); + return iterator; + } + + @Override + public void close() { + it.close(); + } + + @Override + public byte[] position() { + return new byte[0]; + } + }; + } + + protected BackendColumnIterator queryBy(Session session, Query query) { + // Query all + if (query.empty()) { + return this.queryAll(session, query); + } + + // Query by prefix + if (query instanceof IdPrefixQuery) { + IdPrefixQuery pq = (IdPrefixQuery) query; + return this.queryByPrefix(session, pq); + } + + // Query by range + if (query instanceof IdRangeQuery) { + IdRangeQuery rq = (IdRangeQuery) query; + return this.queryByRange(session, rq); + } + + // Query by id + if (query.conditions().isEmpty()) { + assert !query.ids().isEmpty(); + // 单个id查询 走get接口查询 + if (query.ids().size() == 1) { + return this.getById(session, query.ids().iterator().next()); + } + // NOTE: this will lead to lazy create rocksdb iterator + LinkedList<HgOwnerKey> hgOwnerKeys = new LinkedList<>(); + for (Id id : query.ids()) { + hgOwnerKeys.add(HgOwnerKey.of(this.ownerByIdDelegate.apply(id), + id.asBytes())); + } + BackendColumnIterator withBatch = session.getWithBatch(this.table(), + hgOwnerKeys); + return BackendColumnIterator.wrap(withBatch); + } + + // Query by condition (or condition + id) + ConditionQuery cq = (ConditionQuery) query; + return this.queryByCond(session, cq); + } + + protected BackendColumnIterator queryAll(Session session, Query query) { + if (query.paging()) { + PageState page = PageState.fromString(query.page()); + byte[] ownerKey = this.getOwnerScanDelegate().get(); + int scanType = Session.SCAN_ANY | + (query.withProperties() ? 0 : Session.SCAN_KEY_ONLY); + byte[] queryBytes = query instanceof ConditionQuery ? + ((ConditionQuery) query).bytes() : null; + // LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}, " + + // "keyFrom: null, keyTo: null, scanType: {}, " + + // "conditionQuery: {}, position: {}", + // this.table(), bytes2String(ownerKey), + // bytes2String(ownerKey), scanType, + // queryBytes, page.position()); + return session.scan(this.table(), ownerKey, ownerKey, null, + null, scanType, queryBytes, + page.position()); + } + return session.scan(this.table(), + query instanceof ConditionQuery ? + ((ConditionQuery) query).bytes() : null); + } + + protected BackendColumnIterator queryById(Session session, Id id) { + // TODO: change to get() after vertex and schema don't use id prefix + return session.scan(this.table(), this.ownerByIdDelegate.apply(id), + id.asBytes()); + } + + protected BackendColumnIterator getById(Session session, Id id) { + byte[] value = session.get(this.table(), + this.ownerByIdDelegate.apply(id), + id.asBytes()); + if (value.length == 0) { + return BackendColumnIterator.empty(); + } + BackendColumn col = BackendColumn.of(id.asBytes(), value); + return BackendColumnIterator.iterator(col); + } + + protected BackendColumnIterator queryByPrefix(Session session, + IdPrefixQuery query) { + int type = query.inclusiveStart() ? + Session.SCAN_GTE_BEGIN : Session.SCAN_GT_BEGIN; + type |= Session.SCAN_PREFIX_END; + byte[] position = null; + if (query.paging()) { + position = PageState.fromString(query.page()).position(); + } + ConditionQuery originQuery = (ConditionQuery) query.originQuery(); + if (originQuery != null) { + originQuery = prepareConditionQuery(originQuery); + } + byte[] ownerKeyFrom = this.ownerByQueryDelegate.apply(query.resultType(), + query.start()); + byte[] ownerKeyTo = this.ownerByQueryDelegate.apply(query.resultType(), + query.prefix()); + byte[] keyFrom = query.start().asBytes(); + // 前缀分页查询中, start为最初的位置。因为在不同的分区 都是从start位置开始查询 + if (query.paging()) { + keyFrom = query.prefix().asBytes(); + } + byte[] keyTo = query.prefix().asBytes(); + byte[] queryBytes = originQuery == null ? + null : + originQuery.bytes(); + + // LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}," + + // "keyFrom: {}, keyTo: {}, scanType: {}, conditionQuery: {}," + + // "position: {}", + // this.table(), bytes2String(ownerKeyFrom), + // bytes2String(ownerKeyTo), bytes2String(keyFrom), + // bytes2String(keyTo), type, originQuery, position); + + return session.scan(this.table(), ownerKeyFrom, ownerKeyTo, keyFrom, + keyTo, type, queryBytes, position); + } + + protected List<BackendColumnIterator> queryByPrefixList( + Session session, + List<IdPrefixQuery> queries, + String tableName) { + E.checkArgument(queries.size() > 0, + "The size of queries must be greater than zero"); + IdPrefixQuery query = queries.get(0); + int type = 0; + LinkedList<HgOwnerKey> ownerKey = new LinkedList<>(); + queries.forEach((item) -> { + byte[] prefix = this.ownerByQueryDelegate.apply(item.resultType(), + item.prefix()); + ownerKey.add(HgOwnerKey.of(prefix, item.prefix().asBytes())); + }); + ConditionQuery originQuery = (ConditionQuery) query.originQuery(); + if (originQuery != null) { + originQuery = prepareConditionQueryList(originQuery); + } + byte[] queryBytes = originQuery == null ? null : originQuery.bytes(); + + // LOG.debug("query {} with scanType: {}, limit: {}, conditionQuery: + // {}", this.table(), type, query.limit(), queryBytes); + return session.scan(tableName, ownerKey, type, + query.limit(), queryBytes); + } + + /*** + * Prepare ConditionQuery to do operator sinking, because some scenes do not need to be + * preserved + * @param conditionQuery + * @return + */ + private ConditionQuery prepareConditionQuery(ConditionQuery conditionQuery) { + if (CollectionUtils.isEmpty(conditionQuery.userpropConditions())) { + return null; + } + // only userpropConditions can send to store + Collection<Condition> conditions = conditionQuery.conditions(); + List<Condition> newConditions = new ArrayList<>(); + for (Condition condition : conditions) { + if (!onlyOwnerVertex(condition)) { + newConditions.add(condition); + } + } + if (newConditions.size() > 0) { + conditionQuery.resetConditions(newConditions); + return conditionQuery; + } else { + return null; + } + } + + /*** + * Prepare ConditionQuery to do operator sinking, because some scenes do not need to be + * preserved + * @param conditionQuery + * @return + */ + private ConditionQuery prepareConditionQueryList(ConditionQuery conditionQuery) { + if (!conditionQuery.containsLabelOrUserpropRelation()) { + return null; + } + // only userpropConditions can send to store + Collection<Condition> conditions = conditionQuery.conditions(); + List<Condition> newConditions = new ArrayList<>(); + for (Condition condition : conditions) { + if (!onlyOwnerVertex(condition)) { + newConditions.add(condition); + } + } + if (newConditions.size() > 0) { + conditionQuery.resetConditions(newConditions); + return conditionQuery; + } else { + return null; + } + } + + private boolean onlyOwnerVertex(Condition condition) { + boolean onlyOwnerVertex = true; + List<? extends Relation> relations = condition.relations(); + for (Relation r : relations) { + if (!r.key().equals(HugeKeys.OWNER_VERTEX)) { + onlyOwnerVertex = false; + break; + } + } + return onlyOwnerVertex; + } + + protected BackendColumnIterator queryByRange(Session session, + IdRangeQuery query) { + byte[] start = query.start().asBytes(); + byte[] end = query.end() == null ? null : query.end().asBytes(); + int type = query.inclusiveStart() ? + Session.SCAN_GTE_BEGIN : Session.SCAN_GT_BEGIN; + if (end != null) { + type |= query.inclusiveEnd() ? + Session.SCAN_LTE_END : Session.SCAN_LT_END; + } + ConditionQuery cq; + Query origin = query.originQuery(); + byte[] position = null; + if (query.paging() && !query.page().isEmpty()) { + position = PageState.fromString(query.page()).position(); + } + byte[] ownerStart = this.ownerByQueryDelegate.apply(query.resultType(), + query.start()); + byte[] ownerEnd = this.ownerByQueryDelegate.apply(query.resultType(), + query.end()); + if (origin instanceof ConditionQuery && + (query.resultType().isEdge() || query.resultType().isVertex())) { + cq = (ConditionQuery) query.originQuery(); + + // LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}, " + + // "keyFrom: {}, keyTo: {}, " + + // "scanType: {}, conditionQuery: {}", + // this.table(), bytes2String(ownerStart), + // bytes2String(ownerEnd), bytes2String(start), + // bytes2String(end), type, cq.bytes()); + return session.scan(this.table(), ownerStart, + ownerEnd, start, end, type, cq.bytes(), position); + } + return session.scan(this.table(), ownerStart, + ownerEnd, start, end, type, null, position); + } + + protected BackendColumnIterator queryByCond(Session session, + ConditionQuery query) { + if (query.containsScanCondition()) { + E.checkArgument(query.relations().size() == 1, + "Invalid scan with multi conditions: %s", query); + Relation scan = query.relations().iterator().next(); + Shard shard = (Shard) scan.value(); + return this.queryByRange(session, shard, query); + } + // throw new NotSupportException("query: %s", query); + return this.queryAll(session, query); + } + + protected BackendColumnIterator queryByRange(Session session, Shard shard, + ConditionQuery query) { + int type = Session.SCAN_GTE_BEGIN; + type |= Session.SCAN_LT_END; + type |= Session.SCAN_HASHCODE; + type |= query.withProperties() ? 0 : Session.SCAN_KEY_ONLY; + + int start = Integer.parseInt(StringUtils.isEmpty(shard.start()) ? + "0" : shard.start()); + int end = Integer.parseInt(StringUtils.isEmpty(shard.end()) ? + "0" : shard.end()); + byte[] queryBytes = query.bytes(); + String page = query.page(); + if (page != null && !page.isEmpty()) { + byte[] position = PageState.fromString(page).position(); + return session.scan(this.table(), start, end, type, queryBytes, + position); + } + return session.scan(this.table(), start, end, type, queryBytes); + } + + private static class HstoreShardSplitter extends ShardSplitter<Session> { + + public HstoreShardSplitter(String table) { + super(table); + } + + @Override + public List<Shard> getSplits(Session session, long splitSize) { + E.checkArgument(splitSize >= MIN_SHARD_SIZE, + "The split-size must be >= %s bytes, but got %s", + MIN_SHARD_SIZE, splitSize); + + List<Shard> splits = new ArrayList<>(); + try { + PDClient pdClient = HstoreSessionsImpl.getDefaultPdClient(); + List<Metapb.Partition> partitions = pdClient.getPartitions(0, + session.getGraphName()); + for (Metapb.Partition partition : partitions) { + String start = String.valueOf(partition.getStartKey()); + String end = String.valueOf(partition.getEndKey()); + splits.add(new Shard(start, end, 0)); + } + } catch (PDException e) { + e.printStackTrace(); + } + + return splits.size() != 0 ? + splits : super.getSplits(session, splitSize); + } + + @Override + public long estimateDataSize(Session session) { + return 1L; + } + + @Override + public long estimateNumKeys(Session session) { + return 1L; + } + + @Override + public byte[] position(String position) { + if (END.equals(position)) { + return null; + } + return StringEncoding.decodeBase64(position); + } + } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTables.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTables.java new file mode 100644 index 000000000..f4ebf7ebe --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTables.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore; + +import java.util.List; + +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.query.Condition; +import org.apache.hugegraph.backend.query.Condition.Relation; +import org.apache.hugegraph.backend.query.ConditionQuery; +import org.apache.hugegraph.backend.serializer.BinarySerializer; +import org.apache.hugegraph.backend.store.BackendEntry; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; +import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Session; +import org.apache.hugegraph.type.HugeTableType; +import org.apache.hugegraph.type.HugeType; +import org.apache.hugegraph.type.define.HugeKeys; +import org.apache.hugegraph.util.E; + +public class HstoreTables { + + public static class Vertex extends HstoreTable { + + public static final String TABLE = HugeTableType.VERTEX.string(); + + public Vertex(String database) { + super(database, TABLE); + } + + @Override + protected BackendColumnIterator queryById(Session session, Id id) { + return this.getById(session, id); + } + } + + /** + * task信息存储表 + */ + public static class TaskInfo extends HstoreTable { + public static final String TABLE = HugeTableType.TASK_INFO_TABLE.string(); + + public TaskInfo(String database) { + super(database, TABLE); + } + + @Override + protected BackendColumnIterator queryById(Session session, Id id) { + return this.getById(session, id); + } + } + + public static class ServerInfo extends HstoreTable { + public static final String TABLE = HugeTableType.SERVER_INFO_TABLE.string(); + + public ServerInfo(String database) { + super(database, TABLE); + } + + @Override + protected BackendColumnIterator queryById(Session session, Id id) { + return this.getById(session, id); + } + } + + public static class Edge extends HstoreTable { + + public static final String TABLE_SUFFIX = HugeType.EDGE.string(); + + public Edge(boolean out, String database) { + // Edge out/in table + super(database, (out ? HugeTableType.OUT_EDGE.string() : + HugeTableType.IN_EDGE.string())); + } + + public static Edge out(String database) { + return new Edge(true, database); + } + + public static Edge in(String database) { + return new Edge(false, database); + } + + @Override + protected BackendColumnIterator queryById(Session session, Id id) { + return this.getById(session, id); + } + } + + public static class IndexTable extends HstoreTable { + + public static final String TABLE = HugeTableType.ALL_INDEX_TABLE.string(); + + public IndexTable(String database) { + super(database, TABLE); + } + + @Override + public void eliminate(Session session, BackendEntry entry) { + assert entry.columns().size() == 1; + super.delete(session, entry); + } + + @Override + public void delete(Session session, BackendEntry entry) { + /* + * Only delete index by label will come here + * Regular index delete will call eliminate() + */ + byte[] ownerKey = super.ownerDelegate.apply(entry); + for (BackendEntry.BackendColumn column : entry.columns()) { + // Don't assert entry.belongToMe(column), length-prefix is 1* + session.deletePrefix(this.table(), ownerKey, column.name); + } + } + + /** + * 主要用于 range类型的index处理 + * + * @param session + * @param query + * @return + */ + @Override + protected BackendColumnIterator queryByCond(Session session, + ConditionQuery query) { + assert !query.conditions().isEmpty(); + + List<Condition> conds = query.syspropConditions(HugeKeys.ID); + E.checkArgument(!conds.isEmpty(), + "Please specify the index conditions"); + + Id prefix = null; + Id min = null; + boolean minEq = false; + Id max = null; + boolean maxEq = false; + + for (Condition c : conds) { + Relation r = (Relation) c; + switch (r.relation()) { + case PREFIX: + prefix = (Id) r.value(); + break; + case GTE: + minEq = true; + case GT: + min = (Id) r.value(); + break; + case LTE: + maxEq = true; + case LT: + max = (Id) r.value(); + break; + default: + E.checkArgument(false, "Unsupported relation '%s'", + r.relation()); + } + } + + E.checkArgumentNotNull(min, "Range index begin key is missing"); + byte[] begin = min.asBytes(); + if (!minEq) { + BinarySerializer.increaseOne(begin); + } + byte[] ownerStart = this.ownerScanDelegate.get(); + byte[] ownerEnd = this.ownerScanDelegate.get(); + if (max == null) { + E.checkArgumentNotNull(prefix, "Range index prefix is missing"); + return session.scan(this.table(), ownerStart, ownerEnd, begin, + prefix.asBytes(), Session.SCAN_PREFIX_END); + } else { + byte[] end = max.asBytes(); + int type = maxEq ? Session.SCAN_LTE_END : Session.SCAN_LT_END; + return session.scan(this.table(), ownerStart, + ownerEnd, begin, end, type); + } + } + } + + public static class OlapTable extends HstoreTable { + + public static final String TABLE = HugeTableType.OLAP_TABLE.string(); + + public OlapTable(String database) { + // 由原先多个ap_{pk_id} 合并成一个ap表 + super(database, TABLE); + } + + @Override + protected BackendColumnIterator queryById(Session session, Id id) { + return this.getById(session, id); + } + + @Override + public boolean isOlap() { + return true; + } + } +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/IdClient.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/IdClient.java new file mode 100644 index 000000000..3e42bce2d --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/IdClient.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore.fake; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.hugegraph.backend.store.hstore.HstoreSessions; +import org.apache.hugegraph.pd.grpc.Pdpb; + +public abstract class IdClient { + + protected HstoreSessions.Session session; + protected String table; + + public IdClient(HstoreSessions.Session session, String table) { + this.session = session; + this.table = table; + } + + protected static byte[] b(long value) { + return ByteBuffer.allocate(Long.BYTES).order( + ByteOrder.nativeOrder()).putLong(value).array(); + } + + protected static long l(byte[] bytes) { + assert bytes.length == Long.BYTES; + return ByteBuffer.wrap(bytes).order( + ByteOrder.nativeOrder()).getLong(); + } + + public abstract Pdpb.GetIdResponse getIdByKey(String key, int delta) + throws Exception; + + public abstract Pdpb.ResetIdResponse resetIdByKey(String key) throws Exception; + + public abstract void increaseId(String key, long increment) + throws Exception; +} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/PDIdClient.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/PDIdClient.java new file mode 100644 index 000000000..0dbfc56ee --- /dev/null +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/fake/PDIdClient.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.hstore.fake; + +import org.apache.hugegraph.backend.store.hstore.HstoreSessions; +import org.apache.hugegraph.backend.store.hstore.HstoreSessionsImpl; +import org.apache.hugegraph.pd.client.PDClient; +import org.apache.hugegraph.pd.grpc.Pdpb; + +public class PDIdClient extends IdClient { + + PDClient pdClient; + + public PDIdClient(HstoreSessions.Session session, String table) { + super(session, table); + pdClient = HstoreSessionsImpl.getDefaultPdClient(); + } + + @Override + public Pdpb.GetIdResponse getIdByKey(String key, int delta) throws Exception { + return pdClient.getIdByKey(key, delta); + } + + @Override + public Pdpb.ResetIdResponse resetIdByKey(String key) throws Exception { + return pdClient.resetIdByKey(key); + } + + @Override + public void increaseId(String key, long increment) throws Exception { + pdClient.getIdByKey(key, (int) increment); + } +}
