This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/force_ci/object_type by this
push:
new ef5cef5c9d5 Two Implementation of Object File Path (#16858)
ef5cef5c9d5 is described below
commit ef5cef5c9d5e38694c8c4fe1d70aed44c6e08f3a
Author: Haonan <[email protected]>
AuthorDate: Tue Dec 16 09:17:04 2025 +0800
Two Implementation of Object File Path (#16858)
* add base32 Object Path
* add plainObjectPath and configuration
* change default configuration
* fix UT errors
* reduce bytes copy
* implement unchangeable config
* Add IT
* add config
* replace region id for object binary
* fix ut
* spotless
* spotless
* fix rebase
* fix rebase
* Fix review
---------
Co-authored-by: shuwenwei <[email protected]>
---
.../it/env/cluster/config/MppCommonConfig.java | 6 +
.../env/cluster/config/MppSharedCommonConfig.java | 7 +
.../iotdb/it/env/cluster/node/DataNodeWrapper.java | 4 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../relational/it/session/IoTDBObjectInsertIT.java | 329 +++++++++++++++++++++
.../it/session/IoTDBObjectInsertIT2.java | 170 +++++++++++
.../it/session/IoTDBSessionRelationalIT.java | 177 -----------
.../iotdb/confignode/conf/ConfigNodeConfig.java | 10 +
.../confignode/conf/ConfigNodeDescriptor.java | 5 +
.../confignode/conf/SystemPropertiesUtils.java | 12 +
.../iotdb/confignode/manager/node/NodeManager.java | 1 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 1 +
.../plan/planner/plan/node/write/ObjectNode.java | 42 +--
.../plan/node/write/RelationalInsertRowsNode.java | 24 +-
.../node/write/RelationalInsertTabletNode.java | 21 +-
.../unary/scalar/ReadObjectColumnTransformer.java | 3 +-
.../storageengine/dataregion/Base32ObjectPath.java | 169 +++++++++++
.../db/storageengine/dataregion/DataRegion.java | 6 +-
.../db/storageengine/dataregion/IObjectPath.java | 70 +++++
.../storageengine/dataregion/PlainObjectPath.java | 126 ++++++++
.../compaction/execute/utils/CompactionUtils.java | 15 +-
.../execute/utils/MultiTsFileDeviceIterator.java | 3 +-
.../fast/FastAlignedSeriesCompactionExecutor.java | 3 +-
.../fast/reader/CompactionAlignedChunkReader.java | 8 +-
.../read/reader/chunk/DiskAlignedChunkLoader.java | 16 +-
.../read/reader/chunk/DiskChunkLoader.java | 10 +
.../tsfile/generator/TsFileNameGenerator.java | 17 --
.../db/storageengine/rescon/disk/TierManager.java | 11 -
.../org/apache/iotdb/db/utils/ObjectTypeUtils.java | 88 +++++-
.../plan/function/RecordObjectTypeTest.java | 6 +
.../unary/scalar/ObjectTypeFunctionTest.java | 6 +
.../object/ObjectTypeCompactionTest.java | 87 ++++--
.../conf/iotdb-system.properties.template | 5 +
.../src/main/thrift/confignode.thrift | 1 +
36 files changed, 1195 insertions(+), 282 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 6cb7e008242..e7f4b228d4f 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -651,6 +651,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) {
+ setProperty("restrict_object_limit", String.valueOf(restrictObjectLimit));
+ return this;
+ }
+
// For part of the log directory
public String getClusterConfigStr() {
return
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 1b4801f5692..abb0f8bf8bb 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -684,4 +684,11 @@ public class MppSharedCommonConfig implements CommonConfig
{
cnConfig.setAuditableOperationResult(auditableOperationResult);
return this;
}
+
+ @Override
+ public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) {
+ cnConfig.setRestrictObjectLimit(restrictObjectLimit);
+ dnConfig.setRestrictObjectLimit(restrictObjectLimit);
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index f08c085bc6c..96a0fbe27e0 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -169,6 +169,10 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
return getDataNodeDir() + File.separator + "system";
}
+ public String getDataNodeObjectDir() {
+ return getDataNodeDir() + File.separator + "data" + File.separator +
"object";
+ }
+
@Override
protected MppJVMConfig initVMConfig() {
return MppJVMConfig.builder()
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index fdcff20dbc8..148046423cd 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -477,4 +477,9 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setAuditableOperationResult(String
auditableOperationResult) {
return this;
}
+
+ @Override
+ public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index f27bdc8d66d..531f94eec4b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -211,4 +211,6 @@ public interface CommonConfig {
CommonConfig setAuditableOperationLevel(String auditableOperationLevel);
CommonConfig setAuditableOperationResult(String auditableOperationResult);
+
+ CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java
new file mode 100644
index 00000000000..08946bf4cce
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.session;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.io.BaseEncoding;
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertNull;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBObjectInsertIT {
+
+ @BeforeClass
+ public static void classSetUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS db1");
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("DROP DATABASE IF EXISTS db1");
+ }
+ }
+
+ @AfterClass
+ public static void classTearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void insertObjectTest()
+ throws IoTDBConnectionException, StatementExecutionException,
IOException {
+ String testObject =
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + "test-classes"
+ + File.separator
+ + "object-example.pt";
+ File object = new File(testObject);
+
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE \"db1\"");
+ // insert table data by tablet
+ List<String> columnNameList =
+ Arrays.asList("region_id", "plant_id", "device_id", "temperature",
"file");
+ List<TSDataType> dataTypeList =
+ Arrays.asList(
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.FLOAT,
+ TSDataType.OBJECT);
+ List<ColumnCategory> columnTypeList =
+ new ArrayList<>(
+ Arrays.asList(
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD));
+ Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList,
columnTypeList, 1);
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(rowIndex, 4, true, 0,
Files.readAllBytes(Paths.get(testObject)));
+ session.insert(tablet);
+ tablet.reset();
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement("select file from object_table where
time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals(
+
BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())),
+ iterator.getString(1));
+ }
+ }
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select READ_OBJECT(file) from object_table where time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Binary binary = iterator.getBlob(1);
+ Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)),
binary.getValues());
+ }
+ }
+ }
+ // test object file path
+ boolean success = false;
+ for (DataNodeWrapper dataNodeWrapper :
EnvFactory.getEnv().getDataNodeWrapperList()) {
+ String objectDirStr = dataNodeWrapper.getDataNodeObjectDir();
+ File objectDir = new File(objectDirStr);
+ if (objectDir.exists() && objectDir.isDirectory()) {
+ File[] regionDirs = objectDir.listFiles();
+ if (regionDirs != null) {
+ for (File regionDir : regionDirs) {
+ if (regionDir.isDirectory()) {
+ File objectFile =
+ new File(
+ regionDir,
+ convertPathString("object_table")
+ + File.separator
+ + convertPathString("1")
+ + File.separator
+ + convertPathString("5")
+ + File.separator
+ + convertPathString("3")
+ + File.separator
+ + convertPathString("file")
+ + File.separator
+ + "1.bin");
+ if (objectFile.exists() && objectFile.isFile()) {
+ success = true;
+ }
+ }
+ }
+ }
+ }
+ }
+ Assert.assertTrue(success);
+ }
+
+ @Test
+ public void insertObjectSegmentsTest()
+ throws IoTDBConnectionException, StatementExecutionException,
IOException {
+ String testObject =
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + "test-classes"
+ + File.separator
+ + "object-example.pt";
+ byte[] objectBytes = Files.readAllBytes(Paths.get(testObject));
+ List<byte[]> objectSegments = new ArrayList<>();
+ for (int i = 0; i < objectBytes.length; i += 512) {
+ objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512,
objectBytes.length)));
+ }
+
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE \"db1\"");
+ // insert table data by tablet
+ List<String> columnNameList =
+ Arrays.asList("region_id", "plant_id", "device_id", "temperature",
"file");
+ List<TSDataType> dataTypeList =
+ Arrays.asList(
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.FLOAT,
+ TSDataType.OBJECT);
+ List<ColumnCategory> columnTypeList =
+ new ArrayList<>(
+ Arrays.asList(
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD));
+ Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList,
columnTypeList, 1);
+ for (int i = 0; i < objectSegments.size() - 1; i++) {
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i));
+ session.insert(tablet);
+ tablet.reset();
+ }
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement("select file from object_table where
time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ assertNull(iterator.getString(1));
+ }
+ }
+
+ // insert segment with wrong offset
+ try {
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1));
+ session.insert(tablet);
+ } catch (StatementExecutionException e) {
+ Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(),
e.getStatusCode());
+ Assert.assertEquals(
+ String.format(
+ "741: The file length %d is not equal to the offset %d",
+ ((objectSegments.size() - 1) * 512), 512L),
+ e.getMessage());
+ } finally {
+ tablet.reset();
+ }
+
+ // last segment
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(
+ rowIndex,
+ 4,
+ true,
+ (objectSegments.size() - 1) * 512L,
+ objectSegments.get(objectSegments.size() - 1));
+ session.insert(tablet);
+ tablet.reset();
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select READ_OBJECT(file) from object_table where time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Binary binary = iterator.getBlob(1);
+ Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)),
binary.getValues());
+ }
+ }
+ }
+
+ // test object file path
+ boolean success = false;
+ for (DataNodeWrapper dataNodeWrapper :
EnvFactory.getEnv().getDataNodeWrapperList()) {
+ String objectDirStr = dataNodeWrapper.getDataNodeObjectDir();
+ File objectDir = new File(objectDirStr);
+ if (objectDir.exists() && objectDir.isDirectory()) {
+ File[] regionDirs = objectDir.listFiles();
+ if (regionDirs != null) {
+ for (File regionDir : regionDirs) {
+ if (regionDir.isDirectory()) {
+ File objectFile =
+ new File(
+ regionDir,
+ convertPathString("object_table")
+ + File.separator
+ + convertPathString("1")
+ + File.separator
+ + convertPathString("5")
+ + File.separator
+ + convertPathString("3")
+ + File.separator
+ + convertPathString("file")
+ + File.separator
+ + "1.bin");
+ if (objectFile.exists() && objectFile.isFile()) {
+ success = true;
+ }
+ }
+ }
+ }
+ }
+ }
+ Assert.assertTrue(success);
+ }
+
+ protected String convertPathString(String path) {
+ return
BaseEncoding.base32().omitPadding().encode(path.getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java
new file mode 100644
index 00000000000..9bd16938f56
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.session;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBObjectInsertIT2 extends IoTDBObjectInsertIT {
+
+ @BeforeClass
+ public static void classSetUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setRestrictObjectLimit(true);
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @AfterClass
+ public static void classTearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void changeRestrictObjectLimitTest()
+ throws IoTDBConnectionException, StatementExecutionException,
IOException {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setRestrictObjectLimit(false);
+ TestUtils.restartCluster(EnvFactory.getEnv());
+ String testObject =
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + "test-classes"
+ + File.separator
+ + "object-example.pt";
+ File object = new File(testObject);
+
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE \"db1\"");
+ // insert table data by tablet
+ List<String> columnNameList =
+ Arrays.asList("region_id", "plant_id", "device_id", "temperature",
"file");
+ List<TSDataType> dataTypeList =
+ Arrays.asList(
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.FLOAT,
+ TSDataType.OBJECT);
+ List<ColumnCategory> columnTypeList =
+ new ArrayList<>(
+ Arrays.asList(
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD));
+ Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList,
columnTypeList, 1);
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(rowIndex, 4, true, 0,
Files.readAllBytes(Paths.get(testObject)));
+ session.insert(tablet);
+ tablet.reset();
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement("select file from object_table where
time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals(
+
BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())),
+ iterator.getString(1));
+ }
+ }
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select READ_OBJECT(file) from object_table where time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Binary binary = iterator.getBlob(1);
+ Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)),
binary.getValues());
+ }
+ }
+ }
+ // test object file path
+ boolean success = false;
+ for (DataNodeWrapper dataNodeWrapper :
EnvFactory.getEnv().getDataNodeWrapperList()) {
+ String objectDirStr = dataNodeWrapper.getDataNodeObjectDir();
+ File objectDir = new File(objectDirStr);
+ if (objectDir.exists() && objectDir.isDirectory()) {
+ File[] regionDirs = objectDir.listFiles();
+ if (regionDirs != null) {
+ for (File regionDir : regionDirs) {
+ if (regionDir.isDirectory()) {
+ File objectFile =
+ new File(
+ regionDir,
+ convertPathString("object_table")
+ + File.separator
+ + convertPathString("1")
+ + File.separator
+ + convertPathString("5")
+ + File.separator
+ + convertPathString("3")
+ + File.separator
+ + convertPathString("file")
+ + File.separator
+ + "1.bin");
+ if (objectFile.exists() && objectFile.isFile()) {
+ success = true;
+ }
+ }
+ }
+ }
+ }
+ }
+ Assert.assertTrue(success);
+ }
+
+ @Override
+ protected String convertPathString(String path) {
+ return path;
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 49884e3c70b..9c02ac94208 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.TableSessionBuilder;
import org.apache.tsfile.enums.ColumnCategory;
@@ -45,7 +44,6 @@ import
org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -63,8 +61,6 @@ import org.junit.runner.RunWith;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
@@ -851,179 +847,6 @@ public class IoTDBSessionRelationalIT {
}
}
- @Test
- public void insertObjectTest()
- throws IoTDBConnectionException, StatementExecutionException,
IOException {
- String testObject =
- System.getProperty("user.dir")
- + File.separator
- + "target"
- + File.separator
- + "test-classes"
- + File.separator
- + "object-example.pt";
- File object = new File(testObject);
-
- try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
- session.executeNonQueryStatement("USE \"db1\"");
- // insert table data by tablet
- List<String> columnNameList =
- Arrays.asList("region_id", "plant_id", "device_id", "temperature",
"file");
- List<TSDataType> dataTypeList =
- Arrays.asList(
- TSDataType.STRING,
- TSDataType.STRING,
- TSDataType.STRING,
- TSDataType.FLOAT,
- TSDataType.OBJECT);
- List<ColumnCategory> columnTypeList =
- new ArrayList<>(
- Arrays.asList(
- ColumnCategory.TAG,
- ColumnCategory.TAG,
- ColumnCategory.TAG,
- ColumnCategory.FIELD,
- ColumnCategory.FIELD));
- Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList,
columnTypeList, 1);
- int rowIndex = tablet.getRowSize();
- tablet.addTimestamp(rowIndex, 1);
- tablet.addValue(rowIndex, 0, "1");
- tablet.addValue(rowIndex, 1, "5");
- tablet.addValue(rowIndex, 2, "3");
- tablet.addValue(rowIndex, 3, 37.6F);
- tablet.addValue(rowIndex, 4, true, 0,
Files.readAllBytes(Paths.get(testObject)));
- session.insert(tablet);
- tablet.reset();
-
- try (SessionDataSet dataSet =
- session.executeQueryStatement("select file from object_table where
time = 1")) {
- SessionDataSet.DataIterator iterator = dataSet.iterator();
- while (iterator.next()) {
- Assert.assertEquals(
-
BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())),
- iterator.getString(1));
- }
- }
-
- try (SessionDataSet dataSet =
- session.executeQueryStatement(
- "select READ_OBJECT(file) from object_table where time = 1")) {
- SessionDataSet.DataIterator iterator = dataSet.iterator();
- while (iterator.next()) {
- Binary binary = iterator.getBlob(1);
- Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)),
binary.getValues());
- }
- }
- }
- }
-
- @Test
- public void insertObjectSegmentsTest()
- throws IoTDBConnectionException, StatementExecutionException,
IOException {
- String testObject =
- System.getProperty("user.dir")
- + File.separator
- + "target"
- + File.separator
- + "test-classes"
- + File.separator
- + "object-example.pt";
- byte[] objectBytes = Files.readAllBytes(Paths.get(testObject));
- List<byte[]> objectSegments = new ArrayList<>();
- for (int i = 0; i < objectBytes.length; i += 512) {
- objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512,
objectBytes.length)));
- }
-
- try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
- session.executeNonQueryStatement("USE \"db1\"");
- // insert table data by tablet
- List<String> columnNameList =
- Arrays.asList("region_id", "plant_id", "device_id", "temperature",
"file");
- List<TSDataType> dataTypeList =
- Arrays.asList(
- TSDataType.STRING,
- TSDataType.STRING,
- TSDataType.STRING,
- TSDataType.FLOAT,
- TSDataType.OBJECT);
- List<ColumnCategory> columnTypeList =
- new ArrayList<>(
- Arrays.asList(
- ColumnCategory.TAG,
- ColumnCategory.TAG,
- ColumnCategory.TAG,
- ColumnCategory.FIELD,
- ColumnCategory.FIELD));
- Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList,
columnTypeList, 1);
- for (int i = 0; i < objectSegments.size() - 1; i++) {
- int rowIndex = tablet.getRowSize();
- tablet.addTimestamp(rowIndex, 1);
- tablet.addValue(rowIndex, 0, "1");
- tablet.addValue(rowIndex, 1, "5");
- tablet.addValue(rowIndex, 2, "3");
- tablet.addValue(rowIndex, 3, 37.6F);
- tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i));
- session.insert(tablet);
- tablet.reset();
- }
-
- try (SessionDataSet dataSet =
- session.executeQueryStatement("select file from object_table where
time = 1")) {
- SessionDataSet.DataIterator iterator = dataSet.iterator();
- while (iterator.next()) {
- assertNull(iterator.getString(1));
- }
- }
-
- // insert segment with wrong offset
- try {
- int rowIndex = tablet.getRowSize();
- tablet.addTimestamp(rowIndex, 1);
- tablet.addValue(rowIndex, 0, "1");
- tablet.addValue(rowIndex, 1, "5");
- tablet.addValue(rowIndex, 2, "3");
- tablet.addValue(rowIndex, 3, 37.6F);
- tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1));
- session.insert(tablet);
- } catch (StatementExecutionException e) {
- Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(),
e.getStatusCode());
- Assert.assertEquals(
- String.format(
- "741: The file length %d is not equal to the offset %d",
- ((objectSegments.size() - 1) * 512), 512L),
- e.getMessage());
- } finally {
- tablet.reset();
- }
-
- // last segment
- int rowIndex = tablet.getRowSize();
- tablet.addTimestamp(rowIndex, 1);
- tablet.addValue(rowIndex, 0, "1");
- tablet.addValue(rowIndex, 1, "5");
- tablet.addValue(rowIndex, 2, "3");
- tablet.addValue(rowIndex, 3, 37.6F);
- tablet.addValue(
- rowIndex,
- 4,
- true,
- (objectSegments.size() - 1) * 512L,
- objectSegments.get(objectSegments.size() - 1));
- session.insert(tablet);
- tablet.reset();
-
- try (SessionDataSet dataSet =
- session.executeQueryStatement(
- "select READ_OBJECT(file) from object_table where time = 1")) {
- SessionDataSet.DataIterator iterator = dataSet.iterator();
- while (iterator.next()) {
- Binary binary = iterator.getBlob(1);
- Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)),
binary.getValues());
- }
- }
- }
- }
-
@Test
public void autoCreateNontagColumnTest()
throws IoTDBConnectionException, StatementExecutionException {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 88e8d76001d..34b69382b8f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -316,6 +316,8 @@ public class ConfigNodeConfig {
private long forceWalPeriodForConfigNodeSimpleInMs = 100;
+ private boolean restrictObjectLimit = false;
+
public ConfigNodeConfig() {
// empty constructor
}
@@ -1275,4 +1277,12 @@ public class ConfigNodeConfig {
public void setFailureDetectorPhiAcceptablePauseInMs(long
failureDetectorPhiAcceptablePauseInMs) {
this.failureDetectorPhiAcceptablePauseInMs =
failureDetectorPhiAcceptablePauseInMs;
}
+
+ public boolean getRestrictObjectLimit() {
+ return restrictObjectLimit;
+ }
+
+ public void setRestrictObjectLimit(boolean restrictObjectLimit) {
+ this.restrictObjectLimit = restrictObjectLimit;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 0ea7a278732..0d3968d3f7a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -368,6 +368,11 @@ public class ConfigNodeDescriptor {
readConsistencyLevel));
}
+ conf.setRestrictObjectLimit(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "restrict_object_limit",
String.valueOf(conf.getRestrictObjectLimit()))));
+
// commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 529f15d06cd..9cf6b2dac96 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -213,6 +213,16 @@ public class SystemPropertiesUtils {
COMMON_CONFIG.setEnableGrantOption(enableGrantOption);
}
}
+
+ if (systemProperties.getProperty("restrict_object_limit", null) != null) {
+ boolean restrictObjectLimit =
+
Boolean.parseBoolean(systemProperties.getProperty("restrict_object_limit"));
+ if (restrictObjectLimit != conf.getRestrictObjectLimit()) {
+ LOGGER.warn(
+ format, "restrict_object_limit", conf.getRestrictObjectLimit(),
restrictObjectLimit);
+ conf.setRestrictObjectLimit(restrictObjectLimit);
+ }
+ }
}
/**
@@ -286,6 +296,8 @@ public class SystemPropertiesUtils {
"tag_attribute_total_size",
String.valueOf(COMMON_CONFIG.getTagAttributeTotalSize()));
systemProperties.setProperty(
"enable_grant_option",
String.valueOf(COMMON_CONFIG.getEnableGrantOption()));
+ systemProperties.setProperty(
+ "restrict_object_limit",
String.valueOf(conf.getRestrictObjectLimit()));
systemPropertiesHandler.overwrite(systemProperties);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index e8083b79b68..234227286df 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -179,6 +179,7 @@ public class NodeManager {
globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode());
globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize());
globalConfig.setEnableGrantOption(commonConfig.getEnableGrantOption());
+
globalConfig.setRestrictObjectLimit(configNodeConfig.getRestrictObjectLimit());
dataSet.setGlobalConfig(globalConfig);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3f19a2101a8..6f861119d39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1206,8 +1206,11 @@ public class IoTDBConfig {
private ConcurrentHashMap<String, EncryptParameter> tsFileDBToEncryptMap =
new ConcurrentHashMap<>(
Collections.singletonMap("root.__audit", new
EncryptParameter("UNENCRYPTED", null)));
+
private long maxObjectSizeInByte = 4 * 1024 * 1024 * 1024L;
+ private boolean restrictObjectLimit = false;
+
IoTDBConfig() {}
public int getMaxLogEntriesNumPerBatch() {
@@ -4335,4 +4338,12 @@ public class IoTDBConfig {
public void setMaxObjectSizeInByte(long maxObjectSizeInByte) {
this.maxObjectSizeInByte = maxObjectSizeInByte;
}
+
+ public boolean getRestrictObjectLimit() {
+ return restrictObjectLimit;
+ }
+
+ public void setRestrictObjectLimit(boolean restrictObjectLimit) {
+ this.restrictObjectLimit = restrictObjectLimit;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e51f445c567..6a1e8dbcccb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2746,6 +2746,7 @@ public class IoTDBDescriptor {
conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass());
conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum());
conf.setReadConsistencyLevel(globalConfig.getReadConsistencyLevel());
+ conf.setRestrictObjectLimit(globalConfig.isRestrictObjectLimit());
}
public void loadRatisConfig(TRatisConfig ratisConfig) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index 10fb9bc3443..4ec37e46bf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -29,11 +29,11 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.dataregion.IObjectPath;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
-import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.tsfile.utils.PublicBAOS;
@@ -60,7 +60,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
private byte[] content;
- private String filePath;
+ private IObjectPath filePath;
private final int contentLength;
@@ -68,7 +68,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
private boolean isGeneratedByRemoteConsensusLeader;
- public ObjectNode(boolean isEOF, long offset, byte[] content, String
filePath) {
+ public ObjectNode(boolean isEOF, long offset, byte[] content, IObjectPath
filePath) {
super(new PlanNodeId(""));
this.isEOF = isEOF;
this.offset = offset;
@@ -77,7 +77,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
this.contentLength = content.length;
}
- public ObjectNode(boolean isEOF, long offset, int contentLength, String
filePath) {
+ public ObjectNode(boolean isEOF, long offset, int contentLength, IObjectPath
filePath) {
super(new PlanNodeId(""));
this.isEOF = isEOF;
this.offset = offset;
@@ -97,12 +97,12 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
return offset;
}
- public void setFilePath(String filePath) {
+ public void setFilePath(IObjectPath filePath) {
this.filePath = filePath;
}
- public String getFilePath() {
- return filePath;
+ public String getFilePathString() {
+ return filePath.toString();
}
@Override
@@ -111,7 +111,11 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
buffer.putLong(searchIndex);
buffer.put((byte) (isEOF ? 1 : 0));
buffer.putLong(offset);
- WALWriteUtils.write(filePath, buffer);
+ try {
+ filePath.serialize(buffer);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
buffer.putInt(content.length);
}
@@ -122,14 +126,14 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
+ Byte.BYTES
+ Long.BYTES
+ Integer.BYTES
- + ReadWriteIOUtils.sizeToWrite(filePath);
+ + filePath.getSerializedSize();
}
public static ObjectNode deserializeFromWAL(DataInputStream stream) throws
IOException {
long searchIndex = stream.readLong();
boolean isEOF = stream.readByte() == 1;
long offset = stream.readLong();
- String filePath = ReadWriteIOUtils.readString(stream);
+ IObjectPath filePath =
IObjectPath.getDeserializer().deserializeFrom(stream);
int contentLength = stream.readInt();
ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength,
filePath);
objectNode.setSearchIndex(searchIndex);
@@ -140,8 +144,9 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
long searchIndex = buffer.getLong();
boolean isEOF = buffer.get() == 1;
long offset = buffer.getLong();
- String filePath = ReadWriteIOUtils.readString(buffer);
- Optional<File> objectFile =
TierManager.getInstance().getAbsoluteObjectFilePath(filePath);
+ IObjectPath filePath =
IObjectPath.getDeserializer().deserializeFrom(buffer);
+ Optional<File> objectFile =
+
TierManager.getInstance().getAbsoluteObjectFilePath(filePath.toString());
int contentLength = buffer.getInt();
byte[] contents = new byte[contentLength];
if (objectFile.isPresent()) {
@@ -152,7 +157,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
throw new RuntimeException(e);
}
} else {
- throw new ObjectFileNotExist(filePath);
+ throw new ObjectFileNotExist(filePath.toString());
}
ObjectNode objectNode = new ObjectNode(isEOF, offset, contents, filePath);
@@ -163,7 +168,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
public static ObjectNode deserialize(ByteBuffer byteBuffer) {
boolean isEoF = ReadWriteIOUtils.readBool(byteBuffer);
long offset = ReadWriteIOUtils.readLong(byteBuffer);
- String filePath = ReadWriteIOUtils.readString(byteBuffer);
+ IObjectPath filePath =
IObjectPath.getDeserializer().deserializeFrom(byteBuffer);
int contentLength = ReadWriteIOUtils.readInt(byteBuffer);
byte[] content = ReadWriteIOUtils.readBytes(byteBuffer, contentLength);
return new ObjectNode(isEoF, offset, content, filePath);
@@ -227,7 +232,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
getType().serialize(byteBuffer);
ReadWriteIOUtils.write(isEOF, byteBuffer);
ReadWriteIOUtils.write(offset, byteBuffer);
- ReadWriteIOUtils.write(filePath, byteBuffer);
+ filePath.serialize(byteBuffer);
ReadWriteIOUtils.write(contentLength, byteBuffer);
byteBuffer.put(content);
}
@@ -237,7 +242,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
getType().serialize(stream);
ReadWriteIOUtils.write(isEOF, stream);
ReadWriteIOUtils.write(offset, stream);
- ReadWriteIOUtils.write(filePath, stream);
+ filePath.serialize(stream);
ReadWriteIOUtils.write(contentLength, stream);
stream.write(content);
}
@@ -251,7 +256,8 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
byte[] contents = new byte[contentLength];
boolean readSuccess = false;
for (int i = 0; i < 2; i++) {
- Optional<File> objectFile =
TierManager.getInstance().getAbsoluteObjectFilePath(filePath);
+ Optional<File> objectFile =
+
TierManager.getInstance().getAbsoluteObjectFilePath(filePath.toString());
if (objectFile.isPresent()) {
try {
readContentFromFile(objectFile.get(), contents);
@@ -279,7 +285,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
}
ReadWriteIOUtils.write(readSuccess && isEOF, stream);
ReadWriteIOUtils.write(offset, stream);
- ReadWriteIOUtils.write(filePath, stream);
+ filePath.serialize(stream);
ReadWriteIOUtils.write(contentLength, stream);
stream.write(contents);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 2297ddcebdd..6eeb3d7322f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -27,24 +27,24 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.db.storageengine.dataregion.IObjectPath;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.db.utils.ObjectTypeUtils.generateObjectBinary;
+
public class RelationalInsertRowsNode extends InsertRowsNode {
// deviceId cache for Table-view insertion
private IDeviceID[] deviceIDs;
@@ -215,22 +215,22 @@ public class RelationalInsertRowsNode extends
InsertRowsNode {
boolean isEoF = buffer.get() == 1;
long offset = buffer.getLong();
byte[] content = ReadWriteIOUtils.readBytes(buffer,
buffer.remaining());
- String relativePath =
- TsFileNameGenerator.generateObjectFilePath(
+ IObjectPath relativePath =
+ IObjectPath.Factory.FACTORY.create(
dataRegionReplicaSet.getRegionId().getId(),
insertRowNode.getTime(),
insertRowNode.getDeviceID(),
insertRowNode.getMeasurements()[j]);
ObjectNode objectNode = new ObjectNode(isEoF, offset, content,
relativePath);
objectNode.setDataRegionReplicaSet(dataRegionReplicaSet);
- byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8);
- byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
- System.arraycopy(
- BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0,
Long.BYTES);
- System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES,
filePathBytes.length);
- ((Binary) values[j]).setValues(valueBytes);
- insertRowNode.setValues(values);
writePlanNodeList.add(objectNode);
+ if (isEoF) {
+ ((Binary) values[j])
+ .setValues(generateObjectBinary(offset + content.length,
relativePath).getValues());
+ insertRowNode.setValues(values);
+ } else {
+ values[j] = null;
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index e3a114211e1..3255c11f1e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -32,7 +32,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.db.storageengine.dataregion.IObjectPath;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.tsfile.enums.TSDataType;
@@ -41,7 +41,6 @@ import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
-import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -50,13 +49,14 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static org.apache.iotdb.db.utils.ObjectTypeUtils.generateObjectBinary;
+
public class RelationalInsertTabletNode extends InsertTabletNode {
// deviceId cache for Table-view insertion
@@ -465,24 +465,23 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
Map.Entry<TRegionReplicaSet, List<Integer>> entry,
List<WritePlanNode> result) {
for (int j = startRow; j < endRow; j++) {
+ if (((Binary[]) columns[column])[j] == null) {
+ continue;
+ }
byte[] binary = ((Binary[]) columns[column])[j].getValues();
ByteBuffer buffer = ByteBuffer.wrap(binary);
boolean isEoF = buffer.get() == 1;
long offset = buffer.getLong();
byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining());
- String relativePath =
- TsFileNameGenerator.generateObjectFilePath(
+ IObjectPath relativePath =
+ IObjectPath.Factory.FACTORY.create(
entry.getKey().getRegionId().getId(), times[j], getDeviceID(j),
measurements[column]);
ObjectNode objectNode = new ObjectNode(isEoF, offset, content,
relativePath);
objectNode.setDataRegionReplicaSet(entry.getKey());
result.add(objectNode);
if (isEoF) {
- byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8);
- byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
- System.arraycopy(
- BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0,
Long.BYTES);
- System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES,
filePathBytes.length);
- ((Binary[]) columns[column])[j] = new Binary(valueBytes);
+ ((Binary[]) columns[column])[j] =
+ generateObjectBinary(offset + content.length, relativePath);
} else {
((Binary[]) columns[column])[j] = null;
if (bitMaps == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 3049a9bc441..a4ad4e25756 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -100,7 +100,8 @@ public class ReadObjectColumnTransformer extends
UnaryColumnTransformer {
}
private Binary readObject(Binary binary) {
- Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
+ Pair<Long, String> objectLengthPathPair =
+ ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair(binary);
long fileLength = objectLengthPathPair.getLeft();
String relativePath = objectLengthPathPair.getRight();
int actualReadSize =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java
new file mode 100644
index 00000000000..d0ea395502a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion;
+
+import com.google.common.io.BaseEncoding;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class Base32ObjectPath implements IObjectPath {
+
+ private final Path path;
+ private int serializedSize = -1;
+
+ private static final Deserializer DESERIALIZER =
+ new Deserializer() {
+ @Override
+ public IObjectPath deserializeFrom(ByteBuffer byteBuffer) {
+ return deserialize(byteBuffer);
+ }
+
+ @Override
+ public IObjectPath deserializeFrom(InputStream inputStream) throws
IOException {
+ return deserialize(inputStream);
+ }
+
+ @Override
+ public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) {
+ return deserialize(byteBuffer);
+ }
+ };
+
+ private static final Factory FACTORY = Base32ObjectPath::new;
+
+ private Base32ObjectPath(String first, String... more) {
+ path = Paths.get(first, more);
+ }
+
+ public Base32ObjectPath(Path path) {
+ this.path = path;
+ }
+
+ public Base32ObjectPath(int regionId, long time, IDeviceID iDeviceID, String
measurement) {
+ Object[] segments = iDeviceID.getSegments();
+ String[] pathSegments = new String[segments.length + 2];
+ for (int i = 0; i < segments.length; i++) {
+ Object segment = segments[i];
+ String segmentString = segment == null ? "null" : segment.toString();
+ pathSegments[i] =
+ BaseEncoding.base32()
+ .omitPadding()
+ .encode(segmentString.getBytes(StandardCharsets.UTF_8));
+ }
+ pathSegments[pathSegments.length - 2] =
+
BaseEncoding.base32().omitPadding().encode(measurement.getBytes(StandardCharsets.UTF_8));
+ pathSegments[pathSegments.length - 1] = time + ".bin";
+ path = Paths.get(String.valueOf(regionId), pathSegments);
+ }
+
+ @Override
+ public int serialize(ByteBuffer byteBuffer) {
+ int cnt = 0;
+ cnt += ReadWriteForEncodingUtils.writeUnsignedVarInt(path.getNameCount(),
byteBuffer);
+ for (Path segment : path) {
+ cnt += ReadWriteIOUtils.writeVar(segment.toString(), byteBuffer);
+ }
+ return cnt;
+ }
+
+ @Override
+ public int serialize(OutputStream outputStream) throws IOException {
+ int cnt = 0;
+ cnt += ReadWriteForEncodingUtils.writeUnsignedVarInt(path.getNameCount(),
outputStream);
+ for (Path segment : path) {
+ cnt += ReadWriteIOUtils.writeVar(segment.toString(), outputStream);
+ }
+ return cnt;
+ }
+
+ @Override
+ public int getSerializedSize() {
+ if (serializedSize != -1) {
+ return serializedSize;
+ }
+ int cnt = ReadWriteForEncodingUtils.varIntSize(path.getNameCount());
+ for (Path segment : path) {
+ byte[] bytes = segment.toString().getBytes(StandardCharsets.UTF_8);
+ cnt += ReadWriteForEncodingUtils.varIntSize(bytes.length);
+ cnt += bytes.length;
+ }
+ serializedSize = cnt;
+ return cnt;
+ }
+
+ @Override
+ public void serializeToObjectValue(ByteBuffer byteBuffer) {
+ serialize(byteBuffer);
+ }
+
+ @Override
+ public int getSerializeSizeToObjectValue() {
+ return getSerializedSize();
+ }
+
+ public static Base32ObjectPath deserialize(ByteBuffer byteBuffer) {
+ int cnt = ReadWriteForEncodingUtils.readUnsignedVarInt(byteBuffer);
+ String first = ReadWriteIOUtils.readVarIntString(byteBuffer);
+ String[] more = new String[cnt - 1];
+
+ for (int i = 0; i < cnt - 1; ++i) {
+ more[i] = ReadWriteIOUtils.readVarIntString(byteBuffer);
+ }
+ return new Base32ObjectPath(first, more);
+ }
+
+ public static Base32ObjectPath deserialize(InputStream stream) throws
IOException {
+ int cnt = ReadWriteForEncodingUtils.readUnsignedVarInt(stream);
+ String first = ReadWriteIOUtils.readVarIntString(stream);
+ String[] more = new String[cnt - 1];
+
+ for (int i = 0; i < cnt - 1; ++i) {
+ more[i] = ReadWriteIOUtils.readVarIntString(stream);
+ }
+
+ return new Base32ObjectPath(first, more);
+ }
+
+ @Override
+ public String toString() {
+ return path.toString();
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public static Factory getFACTORY() {
+ return FACTORY;
+ }
+
+ public static Deserializer getDESERIALIZER() {
+ return DESERIALIZER;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 4db15d48bc6..09c9ee84304 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3584,7 +3584,7 @@ public class DataRegion implements IDataRegionForQuery {
public void writeObject(ObjectNode objectNode) throws Exception {
writeLock("writeObject");
try {
- String relativeTmpPathString = objectNode.getFilePath() + ".tmp";
+ String relativeTmpPathString = objectNode.getFilePathString() + ".tmp";
String objectFileDir =
TierManager.getInstance().getNextFolderForObjectFile();
File objectTmpFile =
FSFactoryProducer.getFSFactory().getFile(objectFileDir,
relativeTmpPathString);
@@ -3596,9 +3596,9 @@ public class DataRegion implements IDataRegionForQuery {
}
if (objectNode.isEOF()) {
File objectFile =
- FSFactoryProducer.getFSFactory().getFile(objectFileDir,
objectNode.getFilePath());
+ FSFactoryProducer.getFSFactory().getFile(objectFileDir,
objectNode.getFilePathString());
if (objectFile.exists()) {
- String relativeBackPathString = objectNode.getFilePath() + ".back";
+ String relativeBackPathString = objectNode.getFilePathString() +
".back";
File objectBackFile =
FSFactoryProducer.getFSFactory().getFile(objectFileDir,
relativeBackPathString);
Files.move(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java
new file mode 100644
index 00000000000..c340aae440c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface IObjectPath {
+
+ IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ int serialize(ByteBuffer byteBuffer);
+
+ int serialize(OutputStream outputStream) throws IOException;
+
+ int getSerializedSize();
+
+ void serializeToObjectValue(ByteBuffer byteBuffer);
+
+ int getSerializeSizeToObjectValue();
+
+ interface Factory {
+
+ IObjectPath create(int regionId, long time, IDeviceID iDeviceID, String
measurement);
+
+ Factory FACTORY =
+ CONFIG.getRestrictObjectLimit()
+ ? PlainObjectPath.getFACTORY()
+ : Base32ObjectPath.getFACTORY();
+ }
+
+ interface Deserializer {
+
+ IObjectPath deserializeFrom(ByteBuffer byteBuffer);
+
+ IObjectPath deserializeFrom(InputStream inputStream) throws IOException;
+
+ IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer);
+ }
+
+ static Deserializer getDeserializer() {
+ return CONFIG.getRestrictObjectLimit()
+ ? PlainObjectPath.getDESERIALIZER()
+ : Base32ObjectPath.getDESERIALIZER();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java
new file mode 100644
index 00000000000..3ae3a925a3c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class PlainObjectPath implements IObjectPath {
+
+ private final String filePath;
+
+ private static final Deserializer DESERIALIZER =
+ new Deserializer() {
+ @Override
+ public IObjectPath deserializeFrom(ByteBuffer byteBuffer) {
+ return deserialize(byteBuffer);
+ }
+
+ @Override
+ public IObjectPath deserializeFrom(InputStream inputStream) throws
IOException {
+ return deserialize(inputStream);
+ }
+
+ @Override
+ public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) {
+ return deserializeObjectValue(byteBuffer);
+ }
+ };
+
+ private static final Factory FACTORY = PlainObjectPath::new;
+
+ public PlainObjectPath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public PlainObjectPath(int regionId, long time, IDeviceID iDeviceID, String
measurement) {
+ String objectFileName = time + ".bin";
+ Object[] segments = iDeviceID.getSegments();
+ StringBuilder relativePathString =
+ new StringBuilder(String.valueOf(regionId)).append(File.separator);
+ for (Object segment : segments) {
+ relativePathString
+ .append(segment == null ? "null" : segment.toString().toLowerCase())
+ .append(File.separator);
+ }
+ relativePathString.append(measurement).append(File.separator);
+ relativePathString.append(objectFileName);
+ this.filePath = relativePathString.toString();
+ }
+
+ @Override
+ public int serialize(ByteBuffer byteBuffer) {
+ return ReadWriteIOUtils.write(filePath, byteBuffer);
+ }
+
+ @Override
+ public int serialize(OutputStream outputStream) throws IOException {
+ return ReadWriteIOUtils.write(filePath, outputStream);
+ }
+
+ @Override
+ public int getSerializedSize() {
+ return ReadWriteIOUtils.sizeToWrite(filePath);
+ }
+
+ @Override
+ public void serializeToObjectValue(ByteBuffer byteBuffer) {
+ byteBuffer.put(filePath.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public int getSerializeSizeToObjectValue() {
+ return filePath.getBytes(StandardCharsets.UTF_8).length;
+ }
+
+ public static PlainObjectPath deserialize(ByteBuffer byteBuffer) {
+ String filePath = ReadWriteIOUtils.readString(byteBuffer);
+ return new PlainObjectPath(filePath);
+ }
+
+ public static PlainObjectPath deserialize(InputStream stream) throws
IOException {
+ String filePath = ReadWriteIOUtils.readString(stream);
+ return new PlainObjectPath(filePath);
+ }
+
+ public static PlainObjectPath deserializeObjectValue(ByteBuffer byteBuffer) {
+ return new
PlainObjectPath(StandardCharsets.UTF_8.decode(byteBuffer).toString());
+ }
+
+ @Override
+ public String toString() {
+ return filePath;
+ }
+
+ public static Factory getFACTORY() {
+ return FACTORY;
+ }
+
+ public static Deserializer getDESERIALIZER() {
+ return DESERIALIZER;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 0262eb36170..bac48729167 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -533,7 +533,8 @@ public class CompactionUtils {
TsFileSequenceReader reader,
List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
List<ModEntry> timeMods,
- List<List<ModEntry>> valueMods)
+ List<List<ModEntry>> valueMods,
+ int currentRegionId)
throws IOException {
if (alignedChunkMetadataList.isEmpty()) {
return;
@@ -578,7 +579,8 @@ public class CompactionUtils {
objectColumnIndexList,
timeDeletionIntervalList,
objectDeletionIntervalList,
- deletionCursors);
+ deletionCursors,
+ currentRegionId);
}
}
@@ -589,7 +591,8 @@ public class CompactionUtils {
List<Integer> objectColumnIndexList,
List<ModEntry> timeDeletions,
List<List<ModEntry>> objectDeletions,
- int[] deletionCursors)
+ int[] deletionCursors,
+ int currentRegionId)
throws IOException {
Chunk timeChunk =
reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
@@ -612,6 +615,12 @@ public class CompactionUtils {
continue;
}
Chunk chunk = reader.readMemChunk(valueChunkMetadata);
+ if (chunk != null) {
+ chunk
+ .getHeader()
+ .setReplaceDecoder(
+ decoder -> ObjectTypeUtils.getReplaceDecoder(decoder,
currentRegionId));
+ }
valueChunks.add(chunk);
valuePages.add(
chunk == null
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 7b99541c006..c07f5a1ad40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -489,7 +489,8 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
readerMap.get(tsFileResource),
alignedChunkMetadataList,
Collections.singletonList(ttlDeletion),
- modificationForValueColumns.stream().map(v ->
emptyList).collect(Collectors.toList()));
+ modificationForValueColumns.stream().map(v ->
emptyList).collect(Collectors.toList()),
+ tsFileResource.getTsFileID().regionId);
}
ModificationUtils.modifyAlignedChunkMetaData(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
index 166417a97ea..7a8e884a491 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
@@ -284,7 +284,8 @@ public class FastAlignedSeriesCompactionExecutor extends
SeriesCompactionExecuto
readerCacheMap.get(resource),
alignedChunkMetadataList,
Collections.singletonList(ttlDeletion),
- valueModifications.stream().map(v ->
emptyList).collect(Collectors.toList()));
+ valueModifications.stream().map(v ->
emptyList).collect(Collectors.toList()),
+ resource.getTsFileID().regionId);
}
// modify aligned chunk metadatas
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java
index a94150deca1..3cbc0c2fbae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java
@@ -92,7 +92,7 @@ public class CompactionAlignedChunkReader {
ByteBuffer compressedTimePageData,
List<ByteBuffer> compressedValuePageDatas)
throws IOException {
- return getPontReader(
+ return getPointReader(
timePageHeader,
valuePageHeaders,
compressedTimePageData,
@@ -106,11 +106,11 @@ public class CompactionAlignedChunkReader {
ByteBuffer compressedTimePageData,
List<ByteBuffer> compressedValuePageDatas)
throws IOException {
- return getPontReader(
+ return getPointReader(
timePageHeader, valuePageHeaders, compressedTimePageData,
compressedValuePageDatas, false);
}
- private IPointReader getPontReader(
+ private IPointReader getPointReader(
PageHeader timePageHeader,
List<PageHeader> valuePageHeaders,
ByteBuffer compressedTimePageData,
@@ -146,7 +146,7 @@ public class CompactionAlignedChunkReader {
valuePageHeaders.get(i),
uncompressedPageData,
valueType,
- Decoder.getDecoderByType(valueChunkHeader.getEncodingType(),
valueType));
+ valueChunkHeader.calculateDecoderForNonTimeChunk());
valuePageReader.setDeleteIntervalList(valueDeleteIntervalList.get(i));
valuePageReaders.add(valuePageReader);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java
index a7c6eb96d42..27883b34e9c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java
@@ -24,7 +24,9 @@ import
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -92,7 +94,7 @@ public class DiskAlignedChunkLoader implements IChunkLoader {
context);
List<Chunk> valueChunkList = new ArrayList<>();
for (IChunkMetadata valueChunkMetadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
- valueChunkList.add(
+ Chunk chunk =
valueChunkMetadata == null
? null
: ChunkCache.getInstance()
@@ -104,7 +106,17 @@ public class DiskAlignedChunkLoader implements
IChunkLoader {
resource.isClosed()),
valueChunkMetadata.getDeleteIntervalList(),
valueChunkMetadata.getStatistics(),
- context));
+ context);
+ final TsFileID tsFileID = getTsFileID();
+ if (chunk != null
+ && tsFileID.regionId > 0
+ && chunkMetaData.getDataType() == TSDataType.OBJECT) {
+ chunk
+ .getHeader()
+ .setReplaceDecoder(
+ decoder -> ObjectTypeUtils.getReplaceDecoder(decoder,
tsFileID.regionId));
+ }
+ valueChunkList.add(chunk);
}
long t2 = System.nanoTime();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java
index be33428ae65..a2ff24233d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java
@@ -24,7 +24,9 @@ import
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.read.common.Chunk;
@@ -85,6 +87,14 @@ public class DiskChunkLoader implements IChunkLoader {
chunkMetaData.getStatistics(),
context);
+ final TsFileID tsFileID = getTsFileID();
+ if (tsFileID.regionId > 0 && chunkMetaData.getDataType() ==
TSDataType.OBJECT) {
+ chunk
+ .getHeader()
+ .setReplaceDecoder(
+ decoder -> ObjectTypeUtils.getReplaceDecoder(decoder,
tsFileID.regionId));
+ }
+
long t2 = System.nanoTime();
IChunkReader chunkReader = new ChunkReader(chunk, globalTimeFilter);
SeriesScanCostMetricSet.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java
index 8ad353a96ef..16be82188e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java
@@ -28,7 +28,6 @@ import
org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.tsfile.common.constant.TsFileConstant;
-import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
import org.slf4j.Logger;
@@ -176,22 +175,6 @@ public class TsFileNameGenerator {
}
}
- public static String generateObjectFilePath(
- int regionId, long time, IDeviceID iDeviceID, String measurement) {
- String objectFileName = time + ".bin";
- Object[] segments = iDeviceID.getSegments();
- StringBuilder relativePathString =
- new StringBuilder(String.valueOf(regionId)).append(File.separator);
- for (Object segment : segments) {
- relativePathString
- .append(segment == null ? "null" : segment.toString().toLowerCase())
- .append(File.separator);
- }
- relativePathString.append(measurement).append(File.separator);
- relativePathString.append(objectFileName);
- return relativePathString.toString();
- }
-
@TestOnly
public static TsFileResource increaseCrossCompactionCnt(TsFileResource
tsFileResource)
throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
index 6355880cebd..a5fa8b54e7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.rescon.disk;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.ObjectFileNotExist;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
@@ -345,16 +344,6 @@ public class TierManager {
return tierDiskSpace;
}
- public File getObjectFile(String relativePath) {
- for (String folder : objectDirs) {
- File file = new File(folder, relativePath);
- if (file.exists()) {
- return file;
- }
- }
- throw new ObjectFileNotExist(relativePath);
- }
-
private enum DiskSpaceType {
TOTAL,
USABLE,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
index 7dd142b77ce..aebcaf9a4dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
@@ -30,13 +30,19 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.service.metrics.FileMetrics;
+import org.apache.iotdb.db.storageengine.dataregion.Base32ObjectPath;
+import org.apache.iotdb.db.storageengine.dataregion.IObjectPath;
+import org.apache.iotdb.db.storageengine.dataregion.PlainObjectPath;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.encoding.decoder.DecoderWrapper;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +51,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
@@ -60,7 +67,8 @@ public class ObjectTypeUtils {
public static ByteBuffer readObjectContent(
Binary binary, long offset, int length, boolean mayNotInCurrentNode) {
- Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
+ Pair<Long, String> objectLengthPathPair =
+ ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair(binary);
long fileLength = objectLengthPathPair.getLeft();
String relativePath = objectLengthPathPair.getRight();
int actualReadSize =
@@ -147,6 +155,69 @@ public class ObjectTypeUtils {
return buffer;
}
+ public static Binary generateObjectBinary(long objectSize, IObjectPath
objectPath) {
+ byte[] valueBytes = new byte[objectPath.getSerializeSizeToObjectValue() +
Long.BYTES];
+ ByteBuffer buffer = ByteBuffer.wrap(valueBytes);
+ ReadWriteIOUtils.write(objectSize, buffer);
+ objectPath.serializeToObjectValue(buffer);
+ return new Binary(buffer.array());
+ }
+
+ public static DecoderWrapper getReplaceDecoder(final Decoder decoder, final
int newRegionId) {
+ return new ObjectRegionIdReplaceDecoder(decoder, newRegionId);
+ }
+
+ private static class ObjectRegionIdReplaceDecoder extends DecoderWrapper {
+
+ private final int newRegionId;
+
+ public ObjectRegionIdReplaceDecoder(Decoder decoder, int newRegionId) {
+ super(decoder);
+ this.newRegionId = newRegionId;
+ }
+
+ @Override
+ public Binary readBinary(ByteBuffer buffer) {
+ Binary originValue = originDecoder.readBinary(buffer);
+ return ObjectTypeUtils.replaceRegionIdForObjectBinary(newRegionId,
originValue);
+ }
+ }
+
+ public static Binary replaceRegionIdForObjectBinary(int newRegionId, Binary
originValue) {
+ Pair<Long, IObjectPath> pair =
+ ObjectTypeUtils.parseObjectBinaryToSizeIObjectPathPair(originValue);
+ IObjectPath objectPath = pair.getRight();
+ try {
+ Path path;
+ if (objectPath instanceof PlainObjectPath) {
+ path = Paths.get(objectPath.toString());
+ } else {
+ path = ((Base32ObjectPath) objectPath).getPath();
+ }
+ int regionId = Integer.parseInt(path.getName(0).toString());
+ if (regionId == newRegionId) {
+ return originValue;
+ }
+ IObjectPath newObjectPath;
+ if (objectPath instanceof PlainObjectPath) {
+ String newPath = objectPath.toString().replaceFirst(regionId + "",
newRegionId + "");
+ newObjectPath = new PlainObjectPath(newPath);
+ } else {
+ String[] subPath = new String[path.getNameCount() - 1];
+ for (int i = 1; i < path.getNameCount(); i++) {
+ subPath[i - 1] = path.getName(i).toString();
+ }
+ Path newPath = Paths.get(newRegionId + "", subPath);
+ newObjectPath = new Base32ObjectPath(newPath);
+ }
+ return ObjectTypeUtils.generateObjectBinary(pair.getLeft(),
newObjectPath);
+ } catch (NumberFormatException e) {
+ throw new IoTDBRuntimeException(
+ "wrong object file path: " + pair.getRight(),
+ TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
+ }
+ }
+
public static int getActualReadSize(String filePath, long fileSize, long
offset, long length) {
if (offset >= fileSize) {
throw new SemanticException(
@@ -164,15 +235,23 @@ public class ObjectTypeUtils {
return (int) actualReadSize;
}
- public static Pair<Long, String> parseObjectBinary(Binary binary) {
+ public static Pair<Long, String>
parseObjectBinaryToSizeStringPathPair(Binary binary) {
byte[] bytes = binary.getValues();
ByteBuffer buffer = ByteBuffer.wrap(bytes);
long length = buffer.getLong();
String relativeObjectFilePath =
- new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+
IObjectPath.getDeserializer().deserializeFromObjectValue(buffer).toString();
return new Pair<>(length, relativeObjectFilePath);
}
+ public static Pair<Long, IObjectPath>
parseObjectBinaryToSizeIObjectPathPair(Binary binary) {
+ byte[] bytes = binary.getValues();
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ long length = buffer.getLong();
+ IObjectPath objectPath =
IObjectPath.getDeserializer().deserializeFromObjectValue(buffer);
+ return new Pair<>(length, objectPath);
+ }
+
public static long getObjectLength(Binary binary) {
byte[] bytes = binary.getValues();
ByteBuffer wrap = ByteBuffer.wrap(bytes);
@@ -189,8 +268,9 @@ public class ObjectTypeUtils {
public static Optional<File> getNullableObjectPathFromBinary(
Binary binary, boolean needTempFile) {
byte[] bytes = binary.getValues();
+ ByteBuffer buffer = ByteBuffer.wrap(bytes, 8, bytes.length - 8);
String relativeObjectFilePath =
- new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+
IObjectPath.getDeserializer().deserializeFromObjectValue(buffer).toString();
return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath,
needTempFile);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
index c5e745aea87..98f2428e8d5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.queryengine.plan.function;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
@@ -54,10 +56,13 @@ import static org.junit.Assert.fail;
public class RecordObjectTypeTest {
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private File objectDir;
@Before
public void setup() {
+ config.setRestrictObjectLimit(true);
try {
objectDir = new
File(TierManager.getInstance().getNextFolderForObjectFile());
} catch (DiskSpaceInsufficientException e) {
@@ -73,6 +78,7 @@ public class RecordObjectTypeTest {
Files.delete(file.toPath());
}
}
+ config.setRestrictObjectLimit(false);
}
@Test
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
index 0a0d73d2ac3..022257d1ee6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
@@ -48,10 +50,13 @@ import java.util.Optional;
public class ObjectTypeFunctionTest {
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private File objectDir;
@Before
public void setup() {
+ config.setRestrictObjectLimit(true);
try {
objectDir = new
File(TierManager.getInstance().getNextFolderForObjectFile());
} catch (DiskSpaceInsufficientException e) {
@@ -67,6 +72,7 @@ public class ObjectTypeFunctionTest {
Files.delete(file.toPath());
}
}
+ config.setRestrictObjectLimit(false);
}
@Test
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
index 4bc11fbec57..4fd7e8c432d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
@@ -23,9 +23,15 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import org.apache.iotdb.db.storageengine.dataregion.Base32ObjectPath;
+import org.apache.iotdb.db.storageengine.dataregion.IObjectPath;
+import org.apache.iotdb.db.storageengine.dataregion.PlainObjectPath;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
@@ -36,6 +42,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.Sett
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
@@ -76,11 +83,15 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
private String threadName;
private File objectDir;
+ private File regionDir;
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@Before
@Override
public void setUp()
throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ config.setRestrictObjectLimit(true);
this.threadName = Thread.currentThread().getName();
Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
@@ -88,6 +99,8 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
super.setUp();
try {
objectDir = new
File(TierManager.getInstance().getNextFolderForObjectFile());
+ regionDir = new File(objectDir, "0");
+ regionDir.mkdirs();
} catch (DiskSpaceInsufficientException e) {
throw new RuntimeException(e);
}
@@ -102,9 +115,10 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
File[] files = objectDir.listFiles();
if (files != null) {
for (File file : files) {
- Files.delete(file.toPath());
+ FileUtils.deleteFileOrDirectory(file);
}
}
+ config.setRestrictObjectLimit(false);
}
public void createTable(String tableName, long ttl) {
@@ -120,9 +134,9 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
@Test
public void testSeqCompactionWithTTL() throws IOException,
WriteProcessException {
Pair<TsFileResource, File> pair1 =
- generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+ generateTsFileAndObject(true, System.currentTimeMillis() - 10000, 0);
Pair<TsFileResource, File> pair2 =
- generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+ generateTsFileAndObject(true, System.currentTimeMillis() + 1000000,
100);
tsFileManager.add(pair1.getLeft(), true);
tsFileManager.add(pair2.getLeft(), true);
InnerSpaceCompactionTask task =
@@ -141,9 +155,9 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
@Test
public void testUnseqCompactionWithTTL() throws IOException,
WriteProcessException {
Pair<TsFileResource, File> pair1 =
- generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+ generateTsFileAndObject(false, System.currentTimeMillis() + 100000, 1);
Pair<TsFileResource, File> pair2 =
- generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000,
0);
tsFileManager.add(pair1.getLeft(), false);
tsFileManager.add(pair2.getLeft(), false);
InnerSpaceCompactionTask task =
@@ -162,9 +176,9 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
@Test
public void testUnseqCompactionWithReadPointWithTTL() throws IOException,
WriteProcessException {
Pair<TsFileResource, File> pair1 =
- generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+ generateTsFileAndObject(false, System.currentTimeMillis() + 100000, 0);
Pair<TsFileResource, File> pair2 =
- generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000,
0);
tsFileManager.add(pair1.getLeft(), false);
tsFileManager.add(pair2.getLeft(), false);
InnerSpaceCompactionTask task =
@@ -183,9 +197,9 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
@Test
public void testCrossCompactionWithTTL() throws IOException,
WriteProcessException {
Pair<TsFileResource, File> pair1 =
- generateTsFileAndObject(true, System.currentTimeMillis() + 100000);
+ generateTsFileAndObject(true, System.currentTimeMillis() + 100000, 1);
Pair<TsFileResource, File> pair2 =
- generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000,
2);
tsFileManager.add(pair1.getLeft(), true);
tsFileManager.add(pair2.getLeft(), false);
CrossSpaceCompactionTask task =
@@ -205,9 +219,9 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
@Test
public void testSettleCompaction() throws IOException, WriteProcessException
{
Pair<TsFileResource, File> pair1 =
- generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+ generateTsFileAndObject(true, System.currentTimeMillis() - 10000, 3);
Pair<TsFileResource, File> pair2 =
- generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+ generateTsFileAndObject(true, System.currentTimeMillis() + 1000000, 0);
tsFileManager.add(pair1.getLeft(), true);
tsFileManager.add(pair2.getLeft(), true);
SettleCompactionTask task =
@@ -224,19 +238,58 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
Assert.assertTrue(pair2.getRight().exists());
}
- private Pair<TsFileResource, File> generateTsFileAndObject(boolean seq, long
timestamp)
- throws IOException, WriteProcessException {
+ @Test
+ public void testPlainObjectBinaryReplaceRegionId() {
+ IObjectPath objectPath = new PlainObjectPath(1, 0, new
StringArrayDeviceID("t1.d1"), "s1");
+ ByteBuffer buffer =
+ ByteBuffer.allocate(Long.BYTES +
objectPath.getSerializeSizeToObjectValue());
+ buffer.putLong(10);
+ objectPath.serializeToObjectValue(buffer);
+
+ Binary origin = new Binary(buffer.array());
+ Binary result = ObjectTypeUtils.replaceRegionIdForObjectBinary(10, origin);
+ ByteBuffer deserializeBuffer = ByteBuffer.wrap(result.getValues());
+ deserializeBuffer.getLong();
+ Assert.assertEquals(
+ new PlainObjectPath(10, 0, new StringArrayDeviceID("t1.d1"),
"s1").toString(),
+
IObjectPath.getDeserializer().deserializeFromObjectValue(deserializeBuffer).toString());
+ }
+
+ @Test
+ public void testBase32ObjectBinaryReplaceRegionId() {
+ config.setRestrictObjectLimit(false);
+ try {
+ IObjectPath objectPath = new Base32ObjectPath(1, 0, new
StringArrayDeviceID("t1.d1"), "s1");
+ ByteBuffer buffer =
+ ByteBuffer.allocate(Long.BYTES +
objectPath.getSerializeSizeToObjectValue());
+ buffer.putLong(10);
+ objectPath.serializeToObjectValue(buffer);
+
+ Binary origin = new Binary(buffer.array());
+ Binary result = ObjectTypeUtils.replaceRegionIdForObjectBinary(10,
origin);
+ ByteBuffer deserializeBuffer = ByteBuffer.wrap(result.getValues());
+ deserializeBuffer.getLong();
+ Assert.assertEquals(
+ new Base32ObjectPath(10, 0, new StringArrayDeviceID("t1.d1"),
"s1").toString(),
+
IObjectPath.getDeserializer().deserializeFromObjectValue(deserializeBuffer).toString());
+ } finally {
+ config.setRestrictObjectLimit(true);
+ }
+ }
+
+ private Pair<TsFileResource, File> generateTsFileAndObject(
+ boolean seq, long timestamp, int regionIdInTsFile) throws IOException,
WriteProcessException {
TsFileResource resource = createEmptyFileAndResource(seq);
- Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+ Path testFile1 = Files.createTempFile(regionDir.toPath(), "test_", ".bin");
byte[] content = new byte[100];
for (int i = 0; i < 100; i++) {
content[i] = (byte) i;
}
Files.write(testFile1, content);
- String relativePath = testFile1.toFile().getName();
- ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES +
relativePath.length());
+ String relativePathInTsFile = regionIdInTsFile + File.separator +
testFile1.toFile().getName();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES +
relativePathInTsFile.length());
buffer.putLong(100L);
- buffer.put(BytesUtils.stringToBytes(relativePath));
+ buffer.put(BytesUtils.stringToBytes(relativePathInTsFile));
buffer.flip();
IDeviceID deviceID = new StringArrayDeviceID("t1", "d1");
try (TsFileIOWriter writer = new TsFileIOWriter(resource.getTsFile())) {
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 0a9a47ee890..4330a11b1f6 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1307,6 +1307,11 @@ tier_ttl_in_ms=-1
# Datatype: long
max_object_file_size_in_byte=4294967296
+# There are no special restrictions on table names, column names, and device
names of the OBJECT type.
+# effectiveMode: first_start
+# Datatype: boolean
+restrict_object_limit=false
+
####################
### Compaction Configurations
####################
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index f2b8ec6b8b0..01bde7c378d 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -50,6 +50,7 @@ struct TGlobalConfig {
11: optional i32 tagAttributeTotalSize
12: optional bool isEnterprise
13: optional i64 timePartitionOrigin
+ 14: optional bool restrictObjectLimit
}
struct TRatisConfig {