This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8e650cbb1e1 Pipe: Reduced the log of epoch switching & Optimized the
memory calculation of insertion event & Refactor & Optimized the memory
reservation logic of tsFile parser provider & Added the missing parsing logic +
Fixed the wrong listening types to table meta sync & Skipped the file parsing
in privilege for empty file and root user & Subscription IT: assertGte for
received tsfile count (#15068)
8e650cbb1e1 is described below
commit 8e650cbb1e1ca9cc9023b87cdfc4ae7ec5cc090e
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 18 19:49:04 2025 +0800
Pipe: Reduced the log of epoch switching & Optimized the memory calculation
of insertion event & Refactor & Optimized the memory reservation logic of
tsFile parser provider & Added the missing parsing logic + Fixed the wrong
listening types to table meta sync & Skipped the file parsing in privilege for
empty file and root user & Subscription IT: assertGte for received tsfile count
(#15068)
Co-authored-by: VGalaxies <[email protected]>
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../IoTDBPathLooseDeviceTsfilePushConsumerIT.java | 10 +--
.../IoTDBTimeLooseTsfilePushConsumerIT.java | 10 +--
.../IoTDBTSPatternTsfilePushConsumerIT.java | 2 +-
.../time/IoTDBRealTimeDBTsfilePushConsumerIT.java | 8 +--
.../time/IoTDBTimeRangeDBTsfilePushConsumerIT.java | 22 +++---
.../write/pipe/payload/PipeDeleteDevicesPlan.java | 44 +++---------
.../request/write/table/AbstractTablePlan.java | 15 +++-
.../extractor/ConfigRegionListeningFilter.java | 3 +-
...ConfigPhysicalPlanTablePatternParseVisitor.java | 62 +++++++---------
...nfigPhysicalPlanTablePrivilegeParseVisitor.java | 82 +++++++++++++---------
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 37 +---------
...igPhysicalPlanTablePatternParseVisitorTest.java | 9 +++
.../protocol/opcda/OpcDaServerHandle.java | 4 +-
.../connector/protocol/opcua/OpcUaNameSpace.java | 4 +-
.../async/IoTDBDataRegionAsyncConnector.java | 21 ------
.../db/pipe/event/common/PipeInsertionEvent.java | 13 ----
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 9 ++-
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 +-
.../tablet/parser/TabletInsertionEventParser.java | 15 ++--
.../common/tsfile/PipeTsFileInsertionEvent.java | 70 ++++++++++++------
.../parser/TsFileInsertionEventParserProvider.java | 19 +++--
.../PipeRealtimeDataRegionHybridExtractor.java | 12 ++--
.../pipe/resource/tsfile/PipeTsFileResource.java | 2 +-
23 files changed, 224 insertions(+), 253 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBPathLooseDeviceTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBPathLooseDeviceTsfilePushConsumerIT.java
index d6fbe7676e9..82b4d42c74e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBPathLooseDeviceTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBPathLooseDeviceTsfilePushConsumerIT.java
@@ -199,7 +199,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT
extends AbstractSubscripti
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 1);
+ assertGte(onReceive.get(), 1);
assertGte(rowCounts.get(0).get(), 3, "Write data before
subscription" + device);
assertGte(rowCounts.get(0).get(), 3, "Write data before
subscription" + device2);
});
@@ -211,7 +211,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT
extends AbstractSubscripti
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 1);
+ assertGte(onReceive.get(), 1);
assertGte(rowCounts.get(0).get(), 3, "Write out-of-range data" +
device);
assertGte(rowCounts.get(0).get(), 3, "Write out-of-range data" +
device2);
});
@@ -223,7 +223,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT
extends AbstractSubscripti
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 2);
+ assertGte(onReceive.get(), 2);
assertGte(rowCounts.get(0).get(), 8, "write data" + device);
assertGte(rowCounts.get(0).get(), 8, "write data " + device2);
});
@@ -235,7 +235,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT
extends AbstractSubscripti
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 3);
+ assertGte(onReceive.get(), 3);
assertGte(rowCounts.get(0).get(), 10, "Write data: end boundary at "
+ device);
assertGte(rowCounts.get(0).get(), 10, "Write data: end boundary at "
+ device2);
});
@@ -246,7 +246,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT
extends AbstractSubscripti
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 3);
+ assertGte(onReceive.get(), 3);
assertGte(rowCounts.get(0).get(), 10, "Write data: > end " + device);
assertGte(rowCounts.get(0).get(), 10, "Write data: > end " +
device2);
});
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsfilePushConsumerIT.java
index 5571f1190b9..4691dd2be7f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsfilePushConsumerIT.java
@@ -186,7 +186,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends
AbstractSubscriptionRegr
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 1);
+ assertGte(onReceive.get(), 1);
assertGte(rowCount.get(), 3);
});
@@ -197,7 +197,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends
AbstractSubscriptionRegr
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 1);
+ assertGte(onReceive.get(), 1);
assertGte(rowCount.get(), 3);
});
@@ -208,7 +208,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends
AbstractSubscriptionRegr
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 2);
+ assertGte(onReceive.get(), 2);
assertGte(rowCount.get(), 8);
});
@@ -219,7 +219,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends
AbstractSubscriptionRegr
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 3);
+ assertGte(onReceive.get(), 3);
assertGte(rowCount.get(), 10);
});
@@ -230,7 +230,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends
AbstractSubscriptionRegr
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 3);
+ assertGte(onReceive.get(), 3);
assertGte(rowCount.get(), 10);
});
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
index bd4f94f8563..4b9bda04ead 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
@@ -218,7 +218,7 @@ public class IoTDBTSPatternTsfilePushConsumerIT extends
AbstractSubscriptionRegr
System.out.println(FORMAT.format(new Date()) + " src:" +
getCount(session_src, sql));
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceiveCount.get(), 2, "receive files over 2");
+ assertGte(onReceiveCount.get(), 2, "receive files over 2");
assertEquals(rowCounts.get(0).get(), 25, device + ".s_0");
assertEquals(rowCounts.get(1).get(), 0, device + ".s_1");
assertEquals(rowCounts.get(2).get(), 0, database + ".d_1.s_0");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBTsfilePushConsumerIT.java
index b1fe9d07937..3e465bb9710 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBTsfilePushConsumerIT.java
@@ -164,8 +164,8 @@ public class IoTDBRealTimeDBTsfilePushConsumerIT extends
AbstractSubscriptionReg
insert_data(System.currentTimeMillis());
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 1, "should process 1 file");
- assertEquals(rowCount.get(), 4, "4 records");
+ assertGte(onReceive.get(), 1, "should process 1 file");
+ assertGte(rowCount.get(), 4, "4 records");
});
// Subscribe and then write data
@@ -173,8 +173,8 @@ public class IoTDBRealTimeDBTsfilePushConsumerIT extends
AbstractSubscriptionReg
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 2, "should process 2 file");
- assertEquals(rowCount.get(), 8, "8 records");
+ assertGte(onReceive.get(), 2, "should process 2 file");
+ assertGte(rowCount.get(), 8, "8 records");
});
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBTsfilePushConsumerIT.java
index 2107d4e9bc1..fa97e511f30 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBTsfilePushConsumerIT.java
@@ -162,38 +162,36 @@ public class IoTDBTimeRangeDBTsfilePushConsumerIT extends
AbstractSubscriptionRe
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 1);
- // loose-time should 2 records,get 4 records
- assertTrue(rowCount.get() >= 2);
+ assertGte(onReceive.get(), 1);
+ assertGte(rowCount.get(), 2);
});
insert_data(System.currentTimeMillis()); // now, not in range
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 1);
- assertTrue(rowCount.get() >= 2);
+ assertGte(onReceive.get(), 1);
+ assertGte(rowCount.get(), 2);
});
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 2);
- assertTrue(rowCount.get() >= 6);
+ assertGte(onReceive.get(), 2);
+ assertGte(rowCount.get(), 6);
});
insert_data(1711814398000L); // 2024-03-30 23:59:58+08:00
AWAIT.untilAsserted(
() -> {
- // Because the end time is 2024-03-31 00:00:00, closed interval
- assertEquals(onReceive.get(), 3);
- assertTrue(rowCount.get() >= 8);
+ assertGte(onReceive.get(), 3);
+ assertGte(rowCount.get(), 8);
});
insert_data(1711900798000L); // 2024-03-31 23:59:58+08:00
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceive.get(), 3);
- assertTrue(rowCount.get() >= 8);
+ assertGte(onReceive.get(), 3);
+ assertGte(rowCount.get(), 8);
});
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java
index 90bfd9c433f..35dc0e929d9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.confignode.consensus.request.write.pipe.payload;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -32,9 +32,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
-public class PipeDeleteDevicesPlan extends ConfigPhysicalPlan {
- private String database;
- private String tableName;
+public class PipeDeleteDevicesPlan extends AbstractTablePlan {
private byte[] patternBytes;
private byte[] filterBytes;
private byte[] modBytes;
@@ -49,22 +47,12 @@ public class PipeDeleteDevicesPlan extends
ConfigPhysicalPlan {
final @Nonnull byte[] patternBytes,
final @Nonnull byte[] filterBytes,
final @Nonnull byte[] modBytes) {
- super(ConfigPhysicalPlanType.PipeDeleteDevices);
- this.database = database;
- this.tableName = tableName;
+ super(ConfigPhysicalPlanType.PipeDeleteDevices, database, tableName);
this.patternBytes = patternBytes;
this.filterBytes = filterBytes;
this.modBytes = modBytes;
}
- public String getDatabase() {
- return database;
- }
-
- public String getTableName() {
- return tableName;
- }
-
public byte[] getPatternBytes() {
return patternBytes;
}
@@ -79,9 +67,7 @@ public class PipeDeleteDevicesPlan extends ConfigPhysicalPlan
{
@Override
protected void serializeImpl(final DataOutputStream stream) throws
IOException {
- stream.writeShort(getType().getPlanType());
- ReadWriteIOUtils.write(database, stream);
- ReadWriteIOUtils.write(tableName, stream);
+ super.serializeImpl(stream);
ReadWriteIOUtils.write(patternBytes.length, stream);
stream.write(patternBytes);
ReadWriteIOUtils.write(filterBytes.length, stream);
@@ -92,8 +78,7 @@ public class PipeDeleteDevicesPlan extends ConfigPhysicalPlan
{
@Override
protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
- this.database = ReadWriteIOUtils.readString(buffer);
- this.tableName = ReadWriteIOUtils.readString(buffer);
+ super.deserializeImpl(buffer);
patternBytes = new byte[ReadWriteIOUtils.readInt(buffer)];
buffer.get(patternBytes);
filterBytes = new byte[ReadWriteIOUtils.readInt(buffer)];
@@ -104,25 +89,16 @@ public class PipeDeleteDevicesPlan extends
ConfigPhysicalPlan {
@Override
public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final PipeDeleteDevicesPlan that = (PipeDeleteDevicesPlan) obj;
- return Objects.equals(this.database, that.database)
- && Objects.equals(this.tableName, that.tableName)
- && Arrays.equals(this.patternBytes, that.patternBytes)
- && Arrays.equals(this.filterBytes, that.filterBytes)
- && Arrays.equals(this.modBytes, that.modBytes);
+ return super.equals(obj)
+ && Arrays.equals(this.patternBytes, ((PipeDeleteDevicesPlan)
obj).patternBytes)
+ && Arrays.equals(this.filterBytes, ((PipeDeleteDevicesPlan)
obj).filterBytes)
+ && Arrays.equals(this.modBytes, ((PipeDeleteDevicesPlan)
obj).modBytes);
}
@Override
public int hashCode() {
return Objects.hash(
- database,
- tableName,
+ super.hashCode(),
Arrays.hashCode(patternBytes),
Arrays.hashCode(filterBytes),
Arrays.hashCode(modBytes));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AbstractTablePlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AbstractTablePlan.java
index 6ee66ca94d4..98d50e4ca33 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AbstractTablePlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AbstractTablePlan.java
@@ -27,8 +27,9 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
-abstract class AbstractTablePlan extends ConfigPhysicalPlan {
+public abstract class AbstractTablePlan extends ConfigPhysicalPlan {
private String database;
@@ -53,6 +54,18 @@ abstract class AbstractTablePlan extends ConfigPhysicalPlan {
return tableName;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final AbstractTablePlan that = (AbstractTablePlan) o;
+ return Objects.equals(database, that.database) &&
Objects.equals(tableName, that.tableName);
+ }
+
@Override
protected void serializeImpl(final DataOutputStream stream) throws
IOException {
stream.writeShort(getType().getPlanType());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index 9af665d74e0..82d23b76350 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -171,6 +171,7 @@ public class ConfigRegionListeningFilter {
Collections.unmodifiableList(
Arrays.asList(
ConfigPhysicalPlanType.CreateUser,
+ ConfigPhysicalPlanType.RCreateUser,
ConfigPhysicalPlanType.CreateUserWithRawPassword)));
OPTION_PLAN_MAP.put(
new PartialPath("auth.user.alter"),
@@ -180,7 +181,7 @@ public class ConfigRegionListeningFilter {
OPTION_PLAN_MAP.put(
new PartialPath("auth.user.drop"),
Collections.unmodifiableList(
- Arrays.asList(ConfigPhysicalPlanType.DropUser,
ConfigPhysicalPlanType.RUpdateUser)));
+ Arrays.asList(ConfigPhysicalPlanType.DropUser,
ConfigPhysicalPlanType.RDropUser)));
// Both
OPTION_PLAN_MAP.put(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java
index 42eeda61284..1d7fd17b628 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java
@@ -27,6 +27,8 @@ import
org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelational
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
@@ -77,10 +79,9 @@ public class PipeConfigPhysicalPlanTablePatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitPipeCreateTable(
final PipeCreateTablePlan pipeCreateTablePlan, final TablePattern
pattern) {
- return matchDatabaseAndTableName(
- pipeCreateTablePlan.getDatabase(),
- pipeCreateTablePlan.getTable().getTableName(),
- pattern)
+ return pattern.matchesDatabase(
+
PathUtils.unQualifyDatabaseName(pipeCreateTablePlan.getDatabase()))
+ &&
pattern.matchesTable(pipeCreateTablePlan.getTable().getTableName())
? Optional.of(pipeCreateTablePlan)
: Optional.empty();
}
@@ -88,72 +89,57 @@ public class PipeConfigPhysicalPlanTablePatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitAddTableColumn(
final AddTableColumnPlan addTableColumnPlan, final TablePattern pattern)
{
- return matchDatabaseAndTableName(
- addTableColumnPlan.getDatabase(),
addTableColumnPlan.getTableName(), pattern)
- ? Optional.of(addTableColumnPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(addTableColumnPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitSetTableProperties(
final SetTablePropertiesPlan setTablePropertiesPlan, final TablePattern
pattern) {
- return matchDatabaseAndTableName(
- setTablePropertiesPlan.getDatabase(),
setTablePropertiesPlan.getTableName(), pattern)
- ? Optional.of(setTablePropertiesPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(setTablePropertiesPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitCommitDeleteColumn(
final CommitDeleteColumnPlan commitDeleteColumnPlan, final TablePattern
pattern) {
- return matchDatabaseAndTableName(
- commitDeleteColumnPlan.getDatabase(),
commitDeleteColumnPlan.getTableName(), pattern)
- ? Optional.of(commitDeleteColumnPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(commitDeleteColumnPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitRenameTableColumn(
final RenameTableColumnPlan renameTableColumnPlan, final TablePattern
pattern) {
- return matchDatabaseAndTableName(
- renameTableColumnPlan.getDatabase(),
renameTableColumnPlan.getTableName(), pattern)
- ? Optional.of(renameTableColumnPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(renameTableColumnPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitCommitDeleteTable(
final CommitDeleteTablePlan commitDeleteTablePlan, final TablePattern
pattern) {
- return matchDatabaseAndTableName(
- commitDeleteTablePlan.getDatabase(),
commitDeleteTablePlan.getTableName(), pattern)
- ? Optional.of(commitDeleteTablePlan)
- : Optional.empty();
+ return visitAbstractTablePlan(commitDeleteTablePlan, pattern);
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitPipeDeleteDevices(
+ final PipeDeleteDevicesPlan pipeDeleteDevicesPlan, final TablePattern
pattern) {
+ return visitAbstractTablePlan(pipeDeleteDevicesPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitSetTableComment(
final SetTableCommentPlan setTableCommentPlan, final TablePattern
pattern) {
- return matchDatabaseAndTableName(
- setTableCommentPlan.getDatabase(),
setTableCommentPlan.getTableName(), pattern)
- ? Optional.of(setTableCommentPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(setTableCommentPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitSetTableColumnComment(
final SetTableColumnCommentPlan setTableColumnCommentPlan, final
TablePattern pattern) {
- return matchDatabaseAndTableName(
- setTableColumnCommentPlan.getDatabase(),
- setTableColumnCommentPlan.getTableName(),
- pattern)
- ? Optional.of(setTableColumnCommentPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(setTableColumnCommentPlan, pattern);
}
- private boolean matchDatabaseAndTableName(
- final String database, final String tableName, final TablePattern
pattern) {
- return pattern.matchesDatabase(PathUtils.unQualifyDatabaseName(database))
- && pattern.matchesTable(tableName);
+ private Optional<ConfigPhysicalPlan> visitAbstractTablePlan(
+ final AbstractTablePlan plan, final TablePattern pattern) {
+ return
pattern.matchesDatabase(PathUtils.unQualifyDatabaseName(plan.getDatabase()))
+ && pattern.matchesTable(plan.getTableName())
+ ? Optional.of(plan)
+ : Optional.empty();
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePrivilegeParseVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePrivilegeParseVisitor.java
index e5f4a170f06..28c6fb8c4cf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePrivilegeParseVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePrivilegeParseVisitor.java
@@ -27,10 +27,14 @@ import
org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelational
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.RenameTableColumnPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.SetTableColumnCommentPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.SetTableCommentPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.SetTablePropertiesPlan;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -90,10 +94,18 @@ public class
PipeConfigPhysicalPlanTablePrivilegeParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitPipeCreateTable(
final PipeCreateTablePlan pipeCreateTablePlan, final String userName) {
- return matchDatabaseAndTableName(
- pipeCreateTablePlan.getDatabase(),
- pipeCreateTablePlan.getTable().getTableName(),
- userName)
+ return ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(
+ userName,
+ new PrivilegeUnion(
+ pipeCreateTablePlan.getDatabase(),
+ pipeCreateTablePlan.getTable().getTableName(),
+ null))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? Optional.of(pipeCreateTablePlan)
: Optional.empty();
}
@@ -101,57 +113,63 @@ public class
PipeConfigPhysicalPlanTablePrivilegeParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitAddTableColumn(
final AddTableColumnPlan addTableColumnPlan, final String userName) {
- return matchDatabaseAndTableName(
- addTableColumnPlan.getDatabase(),
addTableColumnPlan.getTableName(), userName)
- ? Optional.of(addTableColumnPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(addTableColumnPlan, userName);
}
@Override
public Optional<ConfigPhysicalPlan> visitSetTableProperties(
final SetTablePropertiesPlan setTablePropertiesPlan, final String
userName) {
- return matchDatabaseAndTableName(
- setTablePropertiesPlan.getDatabase(),
setTablePropertiesPlan.getTableName(), userName)
- ? Optional.of(setTablePropertiesPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(setTablePropertiesPlan, userName);
}
@Override
public Optional<ConfigPhysicalPlan> visitCommitDeleteColumn(
final CommitDeleteColumnPlan commitDeleteColumnPlan, final String
userName) {
- return matchDatabaseAndTableName(
- commitDeleteColumnPlan.getDatabase(),
commitDeleteColumnPlan.getTableName(), userName)
- ? Optional.of(commitDeleteColumnPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(commitDeleteColumnPlan, userName);
}
@Override
public Optional<ConfigPhysicalPlan> visitRenameTableColumn(
final RenameTableColumnPlan renameTableColumnPlan, final String
userName) {
- return matchDatabaseAndTableName(
- renameTableColumnPlan.getDatabase(),
renameTableColumnPlan.getTableName(), userName)
- ? Optional.of(renameTableColumnPlan)
- : Optional.empty();
+ return visitAbstractTablePlan(renameTableColumnPlan, userName);
}
@Override
public Optional<ConfigPhysicalPlan> visitCommitDeleteTable(
final CommitDeleteTablePlan commitDeleteTablePlan, final String
userName) {
- return matchDatabaseAndTableName(
- commitDeleteTablePlan.getDatabase(),
commitDeleteTablePlan.getTableName(), userName)
- ? Optional.of(commitDeleteTablePlan)
- : Optional.empty();
+ return visitAbstractTablePlan(commitDeleteTablePlan, userName);
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitPipeDeleteDevices(
+ final PipeDeleteDevicesPlan pipeDeleteDevicesPlan, final String
userName) {
+ return visitAbstractTablePlan(pipeDeleteDevicesPlan, userName);
}
- private boolean matchDatabaseAndTableName(
- final String database, final String tableName, final String userName) {
+ @Override
+ public Optional<ConfigPhysicalPlan> visitSetTableComment(
+ final SetTableCommentPlan setTableCommentPlan, final String userName) {
+ return visitAbstractTablePlan(setTableCommentPlan, userName);
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitSetTableColumnComment(
+ final SetTableColumnCommentPlan setTableColumnCommentPlan, final String
userName) {
+ return visitAbstractTablePlan(setTableColumnCommentPlan, userName);
+ }
+
+ private Optional<ConfigPhysicalPlan> visitAbstractTablePlan(
+ final AbstractTablePlan plan, final String userName) {
return ConfigNode.getInstance()
- .getConfigManager()
- .getPermissionManager()
- .checkUserPrivileges(userName, new PrivilegeUnion(database,
tableName, null))
- .getStatus()
- .getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(
+ userName, new PrivilegeUnion(plan.getDatabase(),
plan.getTableName(), null))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? Optional.of(plan)
+ : Optional.empty();
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 4cbb001fb09..b1a3dfac009 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -56,6 +56,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDele
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
@@ -334,48 +335,16 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
PrivilegeType.CREATE))
.getStatus();
case AddTableColumn:
- return configManager
- .checkUserPrivileges(
- username,
- new PrivilegeUnion(
- ((AddTableColumnPlan) plan).getDatabase(),
- ((AddTableColumnPlan) plan).getTableName(),
- PrivilegeType.ALTER))
- .getStatus();
case SetTableProperties:
- return configManager
- .checkUserPrivileges(
- username,
- new PrivilegeUnion(
- ((SetTablePropertiesPlan) plan).getDatabase(),
- ((SetTablePropertiesPlan) plan).getTableName(),
- PrivilegeType.ALTER))
- .getStatus();
case CommitDeleteColumn:
- return configManager
- .checkUserPrivileges(
- username,
- new PrivilegeUnion(
- ((CommitDeleteColumnPlan) plan).getDatabase(),
- ((CommitDeleteColumnPlan) plan).getTableName(),
- PrivilegeType.ALTER))
- .getStatus();
case SetTableComment:
- return configManager
- .checkUserPrivileges(
- username,
- new PrivilegeUnion(
- ((SetTableCommentPlan) plan).getDatabase(),
- ((SetTableCommentPlan) plan).getTableName(),
- PrivilegeType.ALTER))
- .getStatus();
case SetTableColumnComment:
return configManager
.checkUserPrivileges(
username,
new PrivilegeUnion(
- ((SetTableColumnCommentPlan) plan).getDatabase(),
- ((SetTableColumnCommentPlan) plan).getTableName(),
+ ((AbstractTablePlan) plan).getDatabase(),
+ ((AbstractTablePlan) plan).getTableName(),
PrivilegeType.ALTER))
.getStatus();
case CommitDeleteTable:
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
index df6c5050eb9..65db59338c1 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelational
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
@@ -116,6 +117,14 @@ public class
PipeConfigPhysicalPlanTablePatternParseVisitorTest {
new CommitDeleteTablePlan("da", "ac"));
}
+ @Test
+ public void testPipeDeleteDevices() {
+ testInput(
+ new PipeDeleteDevicesPlan("db1", "ab", new byte[0], new byte[0], new
byte[0]),
+ new PipeDeleteDevicesPlan("db1", "ac", new byte[0], new byte[0], new
byte[0]),
+ new PipeDeleteDevicesPlan("da", "ac", new byte[0], new byte[0], new
byte[0]));
+ }
+
@Test
public void testSetTableComment() {
testInput(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
index 1c9e8c77315..167fd4988f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
@@ -194,9 +194,7 @@ public class OpcDaServerHandle implements Closeable {
addItem(itemId, schemas.get(i).getType());
}
for (int j = tablet.getRowSize() - 1; j >= 0; --j) {
- if (Objects.isNull(tablet.getBitMaps())
- || Objects.isNull(tablet.getBitMaps()[i])
- || !tablet.getBitMaps()[i].isMarked(j)) {
+ if (!tablet.isNull(j, i)) {
if (serverTimestampMap.get(itemId) <= tablet.getTimestamp(j)) {
writeData(
itemId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
index 8beccbde59b..acb759df111 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
@@ -124,9 +124,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
for (int i = 0; i < schemas.size(); ++i) {
for (int j = tablet.getRowSize() - 1; j >= 0; --j) {
- if (Objects.isNull(tablet.getBitMaps())
- || Objects.isNull(tablet.getBitMaps()[i])
- || !tablet.getBitMaps()[i].isMarked(j)) {
+ if (!tablet.isNull(j, i)) {
newSchemas.add(schemas.get(i));
timestamps.add(tablet.getTimestamp(j));
values.add(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index d3c3c73b728..4873e1d1fb7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -614,27 +614,6 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return retryEventQueue.size();
}
- // For performance, this will not acquire lock and does not guarantee the
correct
- // result. However, this shall not cause any exceptions when concurrently
read & written.
- public int getRetryEventCount(final String pipeName) {
- final AtomicInteger count = new AtomicInteger(0);
- try {
- retryEventQueue.forEach(
- event -> {
- if (event instanceof EnrichedEvent
- && pipeName.equals(((EnrichedEvent) event).getPipeName())) {
- count.incrementAndGet();
- }
- });
- return count.get();
- } catch (final Exception e) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Failed to get retry event count for pipe {}.", pipeName,
e);
- }
- return count.get();
- }
- }
-
//////////////////////// APIs provided for PipeTransferTrackableHandler
////////////////////////
public boolean isClosed() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
index 7b0437fba06..5be67305d64 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
@@ -28,9 +28,6 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import javax.validation.constraints.NotNull;
-import java.util.HashSet;
-import java.util.Set;
-
/**
* The data model used to record the Event and the data model of the
DataRegion corresponding to the
* source data, so this type requires some specifications .
@@ -60,8 +57,6 @@ public abstract class PipeInsertionEvent extends
EnrichedEvent {
protected String treeModelDatabaseName; // lazy initialization
protected String tableModelDatabaseName; // lazy initialization
- private Set<String> noPrivilegeTableNames = new HashSet<>();
-
protected PipeInsertionEvent(
final String pipeName,
final long creationTime,
@@ -174,12 +169,4 @@ public abstract class PipeInsertionEvent extends
EnrichedEvent {
this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
this.treeModelDatabaseName =
PathUtils.qualifyDatabaseName(tableModelDatabaseName);
}
-
- public void addTable(final String tableName) {
- noPrivilegeTableNames.add(tableName);
- }
-
- public Set<String> getNoPrivilegeTableNames() {
- return noPrivilegeTableNames;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 84976cdab57..598eeaed0da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -85,7 +85,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
+ RamUsageEstimator.shallowSizeOfInstance(WALEntryHandler.class)
+ RamUsageEstimator.shallowSizeOfInstance(WALEntryPosition.class)
+ RamUsageEstimator.shallowSizeOfInstance(AtomicInteger.class)
- + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class);
+ + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class)
+ + RamUsageEstimator.shallowSizeOf(Boolean.class);
private static final long SET_SIZE =
RamUsageEstimator.shallowSizeOfInstance(HashSet.class);
private final WALEntryHandler walEntryHandler;
@@ -579,6 +580,12 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
return INSTANCE_SIZE
+ (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath)
: 0)
+ (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)
+ + (Objects.nonNull(treeModelDatabaseName)
+ ? RamUsageEstimator.sizeOf(treeModelDatabaseName)
+ : 0)
+ + (Objects.nonNull(tableModelDatabaseName)
+ ? RamUsageEstimator.sizeOf(tableModelDatabaseName)
+ : 0)
+ (Objects.nonNull(tableNames)
? SET_SIZE
+
tableNames.stream().mapToLong(RamUsageEstimator::sizeOf).reduce(0L, Long::sum)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 69076810175..10b664cf4b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -363,8 +363,8 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
}
public long count() {
- final Tablet covertedTablet = shouldParseTimeOrPattern() ?
convertToTablet() : tablet;
- return (long) covertedTablet.getRowSize() *
covertedTablet.getSchemas().size();
+ final Tablet convertedTablet = shouldParseTimeOrPattern() ?
convertToTablet() : tablet;
+ return (long) convertedTablet.getRowSize() *
convertedTablet.getSchemas().size();
}
/////////////////////////// parsePatternOrTime ///////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index 13c7c0e5c33..713e5da872c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -142,8 +142,8 @@ public abstract class TabletInsertionEventParser {
final MeasurementSchema[] originMeasurementSchemaList =
insertRowNode.getMeasurementSchemas();
final String[] originColumnNameStringList =
insertRowNode.getMeasurements();
final TsTableColumnCategory[] originColumnCategories =
insertRowNode.getColumnCategories();
- final TSDataType[] originValueColumnDataTypes =
insertRowNode.getDataTypes();
- final Object[] originValueColumns = insertRowNode.getValues();
+ final TSDataType[] originValueDataTypes = insertRowNode.getDataTypes();
+ final Object[] originValues = insertRowNode.getValues();
for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
@@ -154,12 +154,11 @@ public abstract class TabletInsertionEventParser {
originColumnCategories != null && originColumnCategories[i] != null
? originColumnCategories[i].toTsFileColumnType()
: Tablet.ColumnCategory.FIELD;
- this.valueColumnDataTypes[filteredColumnIndex] =
originValueColumnDataTypes[i];
+ this.valueColumnDataTypes[filteredColumnIndex] =
originValueDataTypes[i];
final BitMap bitMap = new BitMap(this.timestampColumn.length);
- if (Objects.isNull(originValueColumns[i])
- || Objects.isNull(originValueColumnDataTypes[i])) {
+ if (Objects.isNull(originValues[i]) ||
Objects.isNull(originValueDataTypes[i])) {
fillNullValue(
- originValueColumnDataTypes[i],
+ originValueDataTypes[i],
this.valueColumns,
bitMap,
filteredColumnIndex,
@@ -167,8 +166,8 @@ public abstract class TabletInsertionEventParser {
} else {
this.valueColumns[filteredColumnIndex] =
filterValueColumnsByRowIndexList(
- originValueColumnDataTypes[i],
- originValueColumns[i],
+ originValueDataTypes[i],
+ originValues[i],
rowIndexList,
true,
bitMap, // use the output bitmap since there is no bitmap in
InsertRowNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 060f62f15cc..19a4913429f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -52,7 +54,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.file.AccessDeniedException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -429,29 +430,52 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
}
@Override
- public void throwIfNoPrivilege() throws IOException {
- if (!isTableModelEvent()) {
- return;
- }
- for (final IDeviceID deviceID : getDeviceSet()) {
- if (!tablePattern.matchesDatabase(getTableModelDatabaseName())
- || !tablePattern.matchesTable(deviceID.getTableName())) {
- continue;
+ public void throwIfNoPrivilege() {
+ try {
+ if (!isTableModelEvent() ||
AuthorityChecker.SUPER_USER.equals(userName)) {
+ return;
+ }
+ if (!waitForTsFileClose()) {
+ LOGGER.info("Temporary tsFile {} detected, will skip its transfer.",
tsFile);
+ return;
}
- if (!Coordinator.getInstance()
- .getAccessControl()
- .checkCanSelectFromTable4Pipe(
- userName,
- new QualifiedObjectName(getTableModelDatabaseName(),
deviceID.getTableName()))) {
- if (skipIfNoPrivileges) {
- shouldParse4Privilege = true;
- } else {
- throw new AccessDeniedException(
- String.format(
- "No privilege for SELECT for user %s at table %s.%s",
- userName, tableModelDatabaseName, deviceID.getTableName()));
+ for (final IDeviceID deviceID : getDeviceSet()) {
+ if (!tablePattern.matchesDatabase(getTableModelDatabaseName())
+ || !tablePattern.matchesTable(deviceID.getTableName())) {
+ continue;
}
+ if (!Coordinator.getInstance()
+ .getAccessControl()
+ .checkCanSelectFromTable4Pipe(
+ userName,
+ new QualifiedObjectName(getTableModelDatabaseName(),
deviceID.getTableName()))) {
+ if (skipIfNoPrivileges) {
+ shouldParse4Privilege = true;
+ } else {
+ throw new AccessDeniedException(
+ String.format(
+ "No privilege for SELECT for user %s at table %s.%s",
+ userName, tableModelDatabaseName,
deviceID.getTableName()));
+ }
+ }
+ }
+ } catch (final AccessDeniedException e) {
+ throw e;
+ } catch (final Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
+
+ final String errorMsg =
+ e instanceof InterruptedException
+ ? String.format(
+ "Interrupted when waiting for parsing privilege for TsFile
%s.",
+ resource.getTsFilePath())
+ : String.format(
+ "Parse TsFile %s when checking privilege error. Because: %s",
+ resource.getTsFilePath(), e.getMessage());
+ LOGGER.warn(errorMsg, e);
+ throw new PipeException(errorMsg, e);
}
}
@@ -578,7 +602,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
: String.format(
"Parse TsFile %s error. Because: %s",
resource.getTsFilePath(), e.getMessage());
LOGGER.warn(errorMsg, e);
- throw new PipeException(errorMsg);
+ throw new PipeException(errorMsg, e);
}
}
@@ -656,7 +680,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
final String errorMsg = String.format("Read TsFile %s error.",
resource.getTsFilePath());
LOGGER.warn(errorMsg, e);
- throw new PipeException(errorMsg);
+ throw new PipeException(errorMsg, e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
index d90f0ad067e..200130f6e14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertion
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -75,13 +76,17 @@ public class TsFileInsertionEventParserProvider {
tsFile, tablePattern, startTime, endTime, pipeTaskMeta, userName,
sourceEvent);
}
- if (startTime != Long.MIN_VALUE
- || endTime != Long.MAX_VALUE
- || treePattern instanceof IoTDBTreePattern
- && !((IoTDBTreePattern)
treePattern).mayMatchMultipleTimeSeriesInOneDevice()) {
- // 1. If time filter exists, use query here because the scan container
may filter it
- // row by row in single page chunk.
- // 2. If the pattern matches only one time series in one device, use
query container here
+ // Use scan container to save memory
+ if ((double)
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()
+ / PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+ > PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) {
+ return new TsFileInsertionEventScanParser(
+ tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+ }
+
+ if (treePattern instanceof IoTDBTreePattern
+ && !((IoTDBTreePattern)
treePattern).mayMatchMultipleTimeSeriesInOneDevice()) {
+ // If the pattern matches only one time series in one device, use query
container here
// because there is no timestamps merge overhead.
//
// Note: We judge prefix pattern as matching multiple timeseries in one
device because it's
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index cf5e88a72a6..38dcd652e87 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -82,13 +82,17 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private void extractTabletInsertion(final PipeRealtimeEvent event) {
- if (canNotUseTabletAnyMore(event)) {
+ TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+
+ if (state != TsFileEpoch.State.USING_TSFILE
+ && state != TsFileEpoch.State.USING_BOTH
+ && canNotUseTabletAnyMore(event)) {
event
.getTsFileEpoch()
.migrateState(
this,
- state -> {
- switch (state) {
+ curState -> {
+ switch (curState) {
case EMPTY:
case USING_TSFILE:
return TsFileEpoch.State.USING_TSFILE;
@@ -100,7 +104,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
});
}
- final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+ state = event.getTsFileEpoch().getState(this);
switch (state) {
case USING_TSFILE:
// Ignore the tablet event.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 2cb3a59c399..dd3ca9c5bc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -49,7 +49,7 @@ public class PipeTsFileResource implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileResource.class);
public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
- private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.7f;
+ public static final float MEMORY_SUFFICIENT_THRESHOLD = 0.7f;
private final File hardlinkOrCopiedFile;
private final boolean isTsFile;