This is an automated email from the ASF dual-hosted git repository.
shoorz pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 79d209b3b3f Revert "Merge branch 'dev/1.3' of
https://github.com/apache/iotdb into dev/1.3"
79d209b3b3f is described below
commit 79d209b3b3f5be785fa98826b6c76389d428cd1c
Author: MiniSho <[email protected]>
AuthorDate: Tue Mar 25 19:16:47 2025 +0800
Revert "Merge branch 'dev/1.3' of https://github.com/apache/iotdb into
dev/1.3"
This reverts commit e1bd3fd8a43816b0c6748d38f0fba733f3e06b6c, reversing
changes made to f019f2627c6b270e4961b4164badf000674d4d20.
---
.../client/async/CnToDnAsyncRequestType.java | 1 -
.../CnToDnInternalServiceAsyncRequestManager.java | 6 -
.../rpc/DataNodeAsyncRequestRPCHandler.java | 1 -
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 28 +----
.../procedure/PartitionTableAutoCleaner.java | 3 -
.../procedure/TimeoutExecutorThread.java | 2 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 15 ---
.../region/NotifyRegionMigrationProcedure.java | 137 ---------------------
.../impl/region/RegionMigrateProcedure.java | 2 -
.../impl/schema/UnsetTemplateProcedure.java | 74 +++--------
.../state/NotifyRegionMigrationState.java | 24 ----
.../procedure/store/ProcedureFactory.java | 7 --
.../confignode/procedure/store/ProcedureType.java | 1 -
iotdb-core/datanode/pom.xml | 2 +-
.../protocol/opcda/OpcDaServerHandle.java | 5 +-
.../async/IoTDBDataRegionAsyncConnector.java | 29 ++---
.../impl/DataNodeInternalRPCServiceImpl.java | 8 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 10 --
.../template/TemplateInternalRPCUpdateType.java | 1 +
.../iotdb/db/service/RegionMigrateService.java | 18 ---
.../agent/SubscriptionBrokerAgent.java | 10 +-
.../db/subscription/broker/SubscriptionBroker.java | 25 ++--
.../broker/SubscriptionPrefetchingQueue.java | 11 +-
.../SubscriptionDataNodeResourceManager.java | 43 -------
.../resource/log/SubscriptionLogManager.java | 39 ------
.../resource/log/SubscriptionLogStatus.java | 82 ------------
.../db/utils/datastructure/AlignedTVList.java | 7 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 21 +---
.../iotdb/commons/conf/CommonDescriptor.java | 10 --
.../subscription/config/SubscriptionConfig.java | 12 --
.../src/main/thrift/datanode.thrift | 10 --
pom.xml | 13 +-
32 files changed, 66 insertions(+), 591 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index 225d4341913..e5d1515d27e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -39,7 +39,6 @@ public enum CnToDnAsyncRequestType {
CREATE_SCHEMA_REGION,
DELETE_REGION,
RESET_PEER_LIST,
- NOTIFY_REGION_MIGRATION,
UPDATE_REGION_ROUTE_MAP,
CHANGE_REGION_LEADER,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 59f99fe57cb..1bae93c7399 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -71,7 +71,6 @@ import
org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
@@ -246,11 +245,6 @@ public class CnToDnInternalServiceAsyncRequestManager
CnToDnAsyncRequestType.UPDATE_REGION_ROUTE_MAP,
(req, client, handler) ->
client.updateRegionCache((TRegionRouteReq) req,
(DataNodeTSStatusRPCHandler) handler));
- actionMapBuilder.put(
- CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION,
- (req, client, handler) ->
- client.notifyRegionMigration(
- (TNotifyRegionMigrationReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.CHANGE_REGION_LEADER,
(req, client, handler) ->
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index 6e2a9dc97cc..dc8f14cfaa9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -202,7 +202,6 @@ public abstract class
DataNodeAsyncRequestRPCHandler<Response>
case STOP_REPAIR_DATA:
case LOAD_CONFIGURATION:
case SET_SYSTEM_STATUS:
- case NOTIFY_REGION_MIGRATION:
case UPDATE_REGION_ROUTE_MAP:
case INVALIDATE_SCHEMA_CACHE:
case INVALIDATE_MATCHED_SCHEMA_CACHE:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 0986e50b31a..87ba3affef1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.confignode.manager.pipe.receiver.protocol;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -284,27 +283,12 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
PrivilegeType.WRITE_SCHEMA.ordinal())
.getStatus();
case SetTTL:
- return Objects.equals(
- configManager
- .getTTLManager()
- .getAllTTL()
- .get(
- String.join(
- String.valueOf(IoTDBConstant.PATH_SEPARATOR),
- ((SetTTLPlan) plan).getPathPattern())),
- ((SetTTLPlan) plan).getTTL())
- ? StatusUtils.OK
- : configManager
- .checkUserPrivileges(
- username,
- ((SetTTLPlan) plan).isDataBase()
- ? Collections.emptyList()
- : Collections.singletonList(
- new PartialPath(((SetTTLPlan)
plan).getPathPattern())),
- ((SetTTLPlan) plan).isDataBase()
- ? PrivilegeType.MANAGE_DATABASE.ordinal()
- : PrivilegeType.WRITE_SCHEMA.ordinal())
- .getStatus();
+ return configManager
+ .checkUserPrivileges(
+ username,
+ Collections.singletonList(new PartialPath(((SetTTLPlan)
plan).getPathPattern())),
+ PrivilegeType.WRITE_SCHEMA.ordinal())
+ .getStatus();
case UpdateTriggerStateInTable:
case DeleteTriggerInTable:
return configManager
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
index ce5d07276db..a4918e6bfb6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
@@ -67,9 +67,6 @@ public class PartitionTableAutoCleaner<Env> extends
InternalProcedure<Env> {
}
}
if (!databaseTTLMap.isEmpty()) {
- LOGGER.info(
- "[PartitionTableCleaner] Periodically activate
PartitionTableAutoCleaner for: {}",
- databaseTTLMap);
// Only clean the partition table when necessary
TTimePartitionSlot currentTimePartitionSlot =
TimePartitionUtils.getCurrentTimePartitionSlot();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 5aaf9a623f5..d4f919c01be 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -100,7 +100,7 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
@Override
public int compareTo(Delayed other) {
- return Long.compare(
+ return Long.compareUnsigned(
this.getDelay(TimeUnit.MILLISECONDS),
other.getDelay(TimeUnit.MILLISECONDS));
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index a21d2349ed9..b6f4fe35d9e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -70,7 +70,6 @@ import
org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
@@ -523,20 +522,6 @@ public class ConfigNodeProcedureEnv {
return req;
}
- public List<TSStatus> notifyRegionMigrationToAllDataNodes(
- TConsensusGroupId consensusGroupId, boolean isStart) {
- final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodeLocations();
- final TNotifyRegionMigrationReq request =
- new TNotifyRegionMigrationReq(consensusGroupId, isStart);
-
- final DataNodeAsyncRequestContext<TNotifyRegionMigrationReq, TSStatus>
clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION, request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- return clientHandler.getResponseList();
- }
-
public void persistRegionGroup(CreateRegionGroupsPlan
createRegionGroupsPlan) {
// Persist the allocation result
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
deleted file mode 100644
index 0160e9116f9..00000000000
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.impl.region;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
-import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
-import org.apache.iotdb.confignode.procedure.state.NotifyRegionMigrationState;
-import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-/** A procedure that notifies all DNs of the ongoing region migration
procedure. */
-public class NotifyRegionMigrationProcedure
- extends RegionOperationProcedure<NotifyRegionMigrationState> {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(NotifyRegionMigrationProcedure.class);
-
- private boolean isStart;
-
- public NotifyRegionMigrationProcedure() {
- super();
- }
-
- public NotifyRegionMigrationProcedure(TConsensusGroupId consensusGroupId,
boolean isStart) {
- super(consensusGroupId);
- this.isStart = isStart;
- }
-
- @Override
- protected Flow executeFromState(ConfigNodeProcedureEnv env,
NotifyRegionMigrationState state)
- throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
- if (regionId == null) {
- return Flow.NO_MORE_STATE;
- }
- try {
- LOGGER.info(
- "[pid{}][NotifyRegionMigration] started, region id is {}.",
getProcId(), regionId);
- env.notifyRegionMigrationToAllDataNodes(regionId, isStart);
- } catch (Exception e) {
- LOGGER.error("[pid{}][NotifyRegionMigration] state {} failed",
getProcId(), state, e);
- return Flow.NO_MORE_STATE;
- }
- LOGGER.info("[pid{}][NotifyRegionMigration] state {} complete",
getProcId(), state);
- return Flow.HAS_MORE_STATE;
- }
-
- @Override
- protected void rollbackState(
- ConfigNodeProcedureEnv configNodeProcedureEnv,
NotifyRegionMigrationState state)
- throws IOException, InterruptedException, ProcedureException {}
-
- @Override
- protected NotifyRegionMigrationState getState(int stateId) {
- return NotifyRegionMigrationState.values()[stateId];
- }
-
- @Override
- protected int getStateId(NotifyRegionMigrationState state) {
- return state.ordinal();
- }
-
- @Override
- protected NotifyRegionMigrationState getInitialState() {
- return NotifyRegionMigrationState.INIT;
- }
-
- @Override
- public void serialize(DataOutputStream stream) throws IOException {
-
stream.writeShort(ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE.getTypeCode());
- super.serialize(stream);
- ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream);
- stream.writeBoolean(isStart);
- }
-
- @Override
- public void deserialize(ByteBuffer byteBuffer) {
- super.deserialize(byteBuffer);
- try {
- regionId =
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
- isStart = (byteBuffer.get() != (byte) 0);
- } catch (ThriftSerDeException e) {
- LOGGER.error("Error in deserialize {}", this.getClass(), e);
- }
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof NotifyRegionMigrationProcedure)) {
- return false;
- }
- NotifyRegionMigrationProcedure procedure =
(NotifyRegionMigrationProcedure) obj;
- return this.regionId.equals(procedure.regionId) && this.isStart ==
procedure.isStart;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(regionId, isStart);
- }
-
- @Override
- public String toString() {
- return "NotifyRegionMigrationProcedure{"
- + "regionId="
- + regionId
- + ", isStart="
- + isStart
- + '}';
- }
-}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index 1acdd2ccd54..f4d5dce26b6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -83,7 +83,6 @@ public class RegionMigrateProcedure extends
RegionOperationProcedure<RegionTrans
regionId,
handler.simplifiedLocation(originalDataNode),
handler.simplifiedLocation(destDataNode));
- addChildProcedure(new NotifyRegionMigrationProcedure(regionId,
true));
setNextState(RegionTransitionState.ADD_REGION_PEER);
break;
case ADD_REGION_PEER:
@@ -125,7 +124,6 @@ public class RegionMigrateProcedure extends
RegionOperationProcedure<RegionTrans
CommonDateTimeUtils.convertMillisecondToDurationStr(
System.currentTimeMillis() - getSubmittedTime()),
DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
- addChildProcedure(new NotifyRegionMigrationProcedure(regionId,
false));
return Flow.NO_MORE_STATE;
default:
throw new ProcedureException("Unsupported state: " + state.name());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
index 5233bb7206e..81405960885 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
@@ -144,6 +144,9 @@ public class UnsetTemplateProcedure
private void invalidateCache(final ConfigNodeProcedureEnv env) {
try {
+ // Cannot roll back after cache invalidation
+ // Because we do not know whether there are time series successfully
created
+ alreadyRollback = true;
executeInvalidateCache(env);
setNextState(UnsetTemplateState.CHECK_DATANODE_TEMPLATE_ACTIVATION);
} catch (final ProcedureException e) {
@@ -214,26 +217,21 @@ public class UnsetTemplateProcedure
}
alreadyRollback = true;
ProcedureException rollbackException;
- try {
- executeRollbackInvalidateCache(env);
- final TSStatus status =
- env.getConfigManager()
- .getClusterSchemaManager()
- .rollbackPreUnsetSchemaTemplate(template.getId(), path);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return;
- } else {
- LOGGER.error(
- "Failed to rollback pre unset template operation of template {}
set on {}",
- template.getName(),
- path);
- rollbackException =
- new ProcedureException(
- new MetadataException(
- "Rollback template pre unset failed because of" +
status.getMessage()));
- }
- } catch (final ProcedureException e) {
- rollbackException = e;
+ final TSStatus status =
+ env.getConfigManager()
+ .getClusterSchemaManager()
+ .rollbackPreUnsetSchemaTemplate(template.getId(), path);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return;
+ } else {
+ LOGGER.error(
+ "Failed to rollback pre unset template operation of template {} set
on {}",
+ template.getName(),
+ path);
+ rollbackException =
+ new ProcedureException(
+ new MetadataException(
+ "Rollback template pre unset failed because of" +
status.getMessage()));
}
try {
executeInvalidateCache(env);
@@ -246,42 +244,6 @@ public class UnsetTemplateProcedure
}
}
- private void executeRollbackInvalidateCache(ConfigNodeProcedureEnv env)
- throws ProcedureException {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- TUpdateTemplateReq rollbackTemplateSetInfoReq = new TUpdateTemplateReq();
- rollbackTemplateSetInfoReq.setType(
-
TemplateInternalRPCUpdateType.ROLLBACK_INVALIDATE_TEMPLATE_SET_INFO.toByte());
- rollbackTemplateSetInfoReq.setTemplateInfo(getAddTemplateSetInfo());
- DataNodeAsyncRequestContext<TUpdateTemplateReq, TSStatus> clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.UPDATE_TEMPLATE,
- rollbackTemplateSetInfoReq,
- dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
- for (TSStatus status : statusMap.values()) {
- // all dataNodes must clear the related template cache
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(
- "Failed to rollback template cache of template {} set on {}",
template.getName(), path);
- throw new ProcedureException(new MetadataException("Rollback template
cache failed"));
- }
- }
- }
-
- private ByteBuffer getAddTemplateSetInfo() {
- if (this.addTemplateSetInfo == null) {
- this.addTemplateSetInfo =
- ByteBuffer.wrap(
- TemplateInternalRPCUtil.generateAddTemplateSetInfoBytes(
- template, path.getFullPath()));
- }
-
- return addTemplateSetInfo;
- }
-
@Override
protected boolean isRollbackSupported(final UnsetTemplateState
unsetTemplateState) {
return true;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
deleted file mode 100644
index 1b964621e5b..00000000000
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.state;
-
-public enum NotifyRegionMigrationState {
- INIT,
-}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index fc88b54f3d5..e5d43137fac 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2
import
org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
import
org.apache.iotdb.confignode.procedure.impl.region.AddRegionPeerProcedure;
import
org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure;
-import
org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure;
import
org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure;
import
org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure;
import
org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionPeerProcedure;
@@ -121,10 +120,6 @@ public class ProcedureFactory implements IProcedureFactory
{
break;
case RECONSTRUCT_REGION_PROCEDURE:
procedure = new ReconstructRegionProcedure();
- break;
- case NOTIFY_REGION_MIGRATION_PROCEDURE:
- procedure = new NotifyRegionMigrationProcedure();
- break;
case DELETE_TIMESERIES_PROCEDURE:
procedure = new DeleteTimeSeriesProcedure(false);
break;
@@ -323,8 +318,6 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
} else if (procedure instanceof ReconstructRegionProcedure) {
return ProcedureType.RECONSTRUCT_REGION_PROCEDURE;
- } else if (procedure instanceof NotifyRegionMigrationProcedure) {
- return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE;
} else if (procedure instanceof CreateTriggerProcedure) {
return ProcedureType.CREATE_TRIGGER_PROCEDURE;
} else if (procedure instanceof DropTriggerProcedure) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 48ccca42d44..82b28186756 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -39,7 +39,6 @@ public enum ProcedureType {
RECONSTRUCT_REGION_PROCEDURE((short) 203),
ADD_REGION_PEER_PROCEDURE((short) 204),
REMOVE_REGION_PEER_PROCEDURE((short) 205),
- NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206),
@TestOnly
CREATE_MANY_DATABASES_PROCEDURE((short) 250),
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index b97a532249a..ae4ae4c62b2 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -262,7 +262,7 @@
<artifactId>jersey-container-servlet-core</artifactId>
</dependency>
<dependency>
- <groupId>com.github.moquette-io.moquette</groupId>
+ <groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
</dependency>
<dependency>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
index 4f5c6e58a96..3560bd26d3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
@@ -65,6 +65,7 @@ public class OpcDaServerHandle implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(OpcDaServerHandle.class);
+ private final PointerByReference ppvServer = new PointerByReference();
private final OpcDaHeader.IOPCServer opcServer;
private final OpcDaHeader.IOPCItemMgt itemMgt;
private final OpcDaHeader.IOPCSyncIO syncIO;
@@ -267,7 +268,6 @@ public class OpcDaServerHandle implements Closeable {
// Free after write
if (Objects.nonNull(bstr)) {
OleAuto.INSTANCE.SysFreeString(bstr);
- bstr = null;
}
final Pointer pErrors = ppErrors.getValue();
@@ -369,6 +369,9 @@ public class OpcDaServerHandle implements Closeable {
serverHandleMap.clear();
// Release resource
+ if (Objects.nonNull(ppvServer.getValue())) {
+ Ole32.INSTANCE.CoTaskMemFree(ppvServer.getValue());
+ }
if (Objects.nonNull(syncIO)) {
syncIO.Release();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index b92c066e8be..bf0b8df2d6c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -199,24 +199,17 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
final AtomicInteger eventsReferenceCount = new
AtomicInteger(sealedFiles.size());
final AtomicBoolean eventsHadBeenAddedToRetryQueue = new
AtomicBoolean(false);
- try {
- for (final File sealedFile : sealedFiles) {
- transfer(
- new PipeTransferTsFileHandler(
- this,
- pipe2WeightMap,
- events,
- eventsReferenceCount,
- eventsHadBeenAddedToRetryQueue,
- sealedFile,
- null,
- false));
- }
- } catch (final Throwable t) {
- LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, t);
- if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- addFailureEventsToRetryQueue(events);
- }
+ for (final File sealedFile : sealedFiles) {
+ transfer(
+ new PipeTransferTsFileHandler(
+ this,
+ pipe2WeightMap,
+ events,
+ eventsReferenceCount,
+ eventsHadBeenAddedToRetryQueue,
+ sealedFile,
+ null,
+ false));
}
} else {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d0924477154..c5e92c754ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -207,7 +207,6 @@ import
org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
@@ -1934,6 +1933,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus updateTemplate(final TUpdateTemplateReq req) {
switch (TemplateInternalRPCUpdateType.getType(req.type)) {
+ // Reserved for rolling upgrade
case ROLLBACK_INVALIDATE_TEMPLATE_SET_INFO:
ClusterTemplateManager.getInstance().addTemplateSetInfo(req.getTemplateInfo());
break;
@@ -2145,12 +2145,6 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return RegionMigrateService.getInstance().getRegionMaintainResult(taskId);
}
- @Override
- public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws
TException {
- RegionMigrateService.getInstance().notifyRegionMigration(req);
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
-
private TSStatus createNewRegion(ConsensusGroupId regionId, String
storageGroup) {
return regionManager.createNewRegion(regionId, storageGroup);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 0af4d91adfe..04d38f25f89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -56,7 +56,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie
import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -183,8 +182,6 @@ public class LoadTsFileScheduler implements IScheduler {
}
shouldRemoveFileFromLoadingSet = true;
- final long startTimeMs = System.currentTimeMillis();
-
if (node.isTsFileEmpty()) {
LOGGER.info("Load skip TsFile {}, because it has no data.",
filePath);
} else if (!node.needDecodeTsFile(
@@ -229,13 +226,6 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
- if (RegionMigrateService.getInstance().getLastNotifyTime() >
startTimeMs) {
- LOGGER.warn(
- "LoadTsFileScheduler: Region migration started or ended during
loading TsFile {}, will convert to insertion to avoid data loss",
- filePath);
- isLoadSingleTsFileSuccess = false;
- }
-
if (isLoadSingleTsFileSuccess) {
node.clean();
LOGGER.info(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/TemplateInternalRPCUpdateType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/TemplateInternalRPCUpdateType.java
index 77193913044..ebe730114e2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/TemplateInternalRPCUpdateType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/TemplateInternalRPCUpdateType.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
public enum TemplateInternalRPCUpdateType {
+ // Deprecated
ROLLBACK_INVALIDATE_TEMPLATE_SET_INFO((byte) 0),
INVALIDATE_TEMPLATE_SET_INFO((byte) 1),
ADD_TEMPLATE_PRE_SET_INFO((byte) 2),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 05328ea91b9..056df802c13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -54,7 +53,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class RegionMigrateService implements IService {
@@ -74,9 +72,6 @@ public class RegionMigrateService implements IService {
// where different asynchronous tasks are submitted to the same datanode
within a single procedure
private static final ConcurrentHashMap<Long, TRegionMigrateResult>
taskResultMap =
new ConcurrentHashMap<>();
-
- private static final AtomicLong lastNotifyTime = new
AtomicLong(Long.MIN_VALUE);
-
private static final TRegionMigrateResult unfinishedResult = new
TRegionMigrateResult();
private RegionMigrateService() {}
@@ -85,19 +80,6 @@ public class RegionMigrateService implements IService {
return Holder.INSTANCE;
}
- public void notifyRegionMigration(TNotifyRegionMigrationReq req) {
- lastNotifyTime.set(System.currentTimeMillis());
- if (req.isIsStart()) {
- LOGGER.info("Region {} is notified to begin migrating",
req.getRegionId());
- } else {
- LOGGER.info("Region {} is notified to finish migrating",
req.getRegionId());
- }
- }
-
- public long getLastNotifyTime() {
- return lastNotifyTime.get();
- }
-
/**
* Submit AddRegionPeerTask
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index f1ee9f8867b..6d70d5796ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.agent;
import org.apache.iotdb.db.subscription.broker.SubscriptionBroker;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import
org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtask;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -198,13 +197,8 @@ public class SubscriptionBrokerAgent {
public boolean executePrefetch(final String consumerGroupId, final String
topicName) {
final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
- SubscriptionDataNodeResourceManager.log()
- .schedule(SubscriptionBrokerAgent.class, consumerGroupId, topicName)
- .ifPresent(
- l ->
- l.warn(
- "Subscription: broker bound to consumer group [{}] does
not exist",
- consumerGroupId));
+ LOGGER.warn(
+ "Subscription: broker bound to consumer group [{}] does not exist",
consumerGroupId);
return false;
}
return broker.executePrefetch(topicName);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 1223672810d..df888ec1b03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
-import
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -420,25 +419,17 @@ public class SubscriptionBroker {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
- SubscriptionDataNodeResourceManager.log()
- .schedule(SubscriptionBroker.class, brokerId, topicName)
- .ifPresent(
- l ->
- l.warn(
- "Subscription: prefetching queue bound to topic [{}] for
consumer group [{}] does not exist",
- topicName,
- brokerId));
+ LOGGER.warn(
+ "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] does not exist",
+ topicName,
+ brokerId);
return false;
}
if (prefetchingQueue.isClosed()) {
- SubscriptionDataNodeResourceManager.log()
- .schedule(SubscriptionBroker.class, brokerId, topicName)
- .ifPresent(
- l ->
- l.warn(
- "Subscription: prefetching queue bound to topic [{}] for
consumer group [{}] is closed",
- topicName,
- brokerId));
+ LOGGER.warn(
+ "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] is closed",
+ topicName,
+ brokerId);
return false;
}
return prefetchingQueue.executePrefetch();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index abd3e1c300c..0514ac0075e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
-import
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import
org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -98,6 +97,9 @@ public abstract class SubscriptionPrefetchingQueue {
private final SubscriptionPrefetchingQueueStates states;
+ private static final long STATE_REPORT_INTERVAL_IN_MS = 10_000L;
+ private long lastStateReportTimestamp = System.currentTimeMillis();
+
private volatile boolean isCompleted = false;
private volatile boolean isClosed = false;
@@ -278,9 +280,10 @@ public abstract class SubscriptionPrefetchingQueue {
}
private void reportStateIfNeeded() {
- SubscriptionDataNodeResourceManager.log()
- .schedule(SubscriptionPrefetchingQueue.class, brokerId, topicName)
- .ifPresent(l -> l.info("Subscription: SubscriptionPrefetchingQueue
state {}", this));
+ if (System.currentTimeMillis() - lastStateReportTimestamp >
STATE_REPORT_INTERVAL_IN_MS) {
+ LOGGER.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
+ lastStateReportTimestamp = System.currentTimeMillis();
+ }
}
@SafeVarargs
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
deleted file mode 100644
index 347299df10f..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.subscription.resource;
-
-import org.apache.iotdb.db.subscription.resource.log.SubscriptionLogManager;
-
-public class SubscriptionDataNodeResourceManager {
-
- private final SubscriptionLogManager subscriptionLogManager;
-
- public static SubscriptionLogManager log() {
- return
SubscriptionDataNodeResourceManagerHolder.INSTANCE.subscriptionLogManager;
- }
-
- ///////////////////////////// SINGLETON /////////////////////////////
-
- private SubscriptionDataNodeResourceManager() {
- subscriptionLogManager = new SubscriptionLogManager();
- }
-
- private static class SubscriptionDataNodeResourceManagerHolder {
-
- private static final SubscriptionDataNodeResourceManager INSTANCE =
- new SubscriptionDataNodeResourceManager();
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
deleted file mode 100644
index 549f9344d7f..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.subscription.resource.log;
-
-import org.slf4j.Logger;
-
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class SubscriptionLogManager {
-
- private final ConcurrentMap<Class<?>, SubscriptionLogStatus>
logClass2LogStatusMap =
- new ConcurrentHashMap<>();
-
- public Optional<Logger> schedule(
- final Class<?> logClass, final String consumerGroupId, final String
topicName) {
- return logClass2LogStatusMap
- .computeIfAbsent(logClass, k -> new SubscriptionLogStatus(logClass))
- .schedule(consumerGroupId, topicName);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
deleted file mode 100644
index 0daae1dc937..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.subscription.resource.log;
-
-import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
-import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.tsfile.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-class SubscriptionLogStatus {
-
- private static final long BASE_INTERVAL_IN_MS =
-
SubscriptionConfig.getInstance().getSubscriptionLogManagerBaseIntervalMs();
-
- private final Logger logger;
- private final Cache<Pair<String, String>, AtomicLong> lastReportTimestamps;
-
- public SubscriptionLogStatus(final Class<?> logClass) {
- this.logger = LoggerFactory.getLogger(logClass);
- this.lastReportTimestamps =
- Caffeine.newBuilder()
- .expireAfterAccess(
-
SubscriptionConfig.getInstance().getSubscriptionLogManagerWindowSeconds(),
- TimeUnit.SECONDS)
- .build();
- }
-
- public Optional<Logger> schedule(final String consumerGroupId, final String
topicName) {
- final Pair<String, String> key = new Pair<>(consumerGroupId, topicName);
- final long now = System.currentTimeMillis();
- // Calculate the allowed logging interval based on the current prefetching
queue count
- final int count = SubscriptionAgent.broker().getPrefetchingQueueCount();
- final long allowedInterval = BASE_INTERVAL_IN_MS * count;
- // If the key does not exist, initialize an AtomicLong set to one interval
before now
- final AtomicLong lastTime =
- Objects.requireNonNull(
- lastReportTimestamps.get(
- key,
- k ->
- new AtomicLong(
- now
- // introduce randomness
- - BASE_INTERVAL_IN_MS
- * ThreadLocalRandom.current().nextLong(1,
count + 1))));
- final long last = lastTime.get();
- if (now - last >= allowedInterval) {
- // Use compareAndSet to ensure that only one thread updates at a time,
- // so that only one log entry is printed per allowed interval
- if (lastTime.compareAndSet(last, now)) {
- return Optional.of(logger);
- }
- }
- return Optional.empty();
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index f379f236290..0b9f4d951b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -1288,11 +1288,8 @@ public abstract class AlignedTVList extends TVList {
if (largestBinaryChunkSize == 0) {
return largestPrimitivePointSize;
}
- int columnValueCnt = getColumnValueCnt(largestBinaryColumnIndex);
- if (columnValueCnt == 0) {
- return largestPrimitivePointSize;
- }
- int avgPointSizeOfLargestBinaryColumn = (int) largestBinaryChunkSize /
columnValueCnt;
+ int avgPointSizeOfLargestBinaryColumn =
+ (int) largestBinaryChunkSize /
getColumnValueCnt(largestBinaryColumnIndex);
return Math.max(avgPointSizeOfLargestBinaryColumn,
largestPrimitivePointSize);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 99e041cdfb4..4b814a26386 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -308,7 +308,7 @@ public class CommonConfig {
private int subscriptionPollMaxBlockingTimeMs = 500;
private int subscriptionDefaultTimeoutInMs = 10_000; // 10s
private long subscriptionLaunchRetryIntervalMs = 1000;
- private int subscriptionRecycleUncommittedEventIntervalMs = 600_000; // 600s
+ private int subscriptionRecycleUncommittedEventIntervalMs = 600000; // 600s
private long subscriptionReadFileBufferSize = 8 * MB;
private long subscriptionReadTabletBufferSize = 8 * MB;
private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
@@ -316,8 +316,6 @@ public class CommonConfig {
private long subscriptionEstimatedInsertNodeTabletInsertionEventSize = 64 *
KB;
private long subscriptionEstimatedRawTabletInsertionEventSize = 16 * KB;
private long subscriptionMaxAllowedEventCountInTabletBatch = 100;
- private long subscriptionLogManagerWindowSeconds = 120; // 120s
- private long subscriptionLogManagerBaseIntervalMs = 1_000; // 1s
private boolean subscriptionPrefetchEnabled = false;
private float subscriptionPrefetchMemoryThreshold = 0.5F;
@@ -1510,23 +1508,6 @@ public class CommonConfig {
subscriptionMaxAllowedEventCountInTabletBatch;
}
- public long getSubscriptionLogManagerWindowSeconds() {
- return subscriptionLogManagerWindowSeconds;
- }
-
- public void setSubscriptionLogManagerWindowSeconds(long
subscriptionLogManagerWindowSeconds) {
- this.subscriptionLogManagerWindowSeconds =
subscriptionLogManagerWindowSeconds;
- }
-
- public long getSubscriptionLogManagerBaseIntervalMs() {
- return subscriptionLogManagerBaseIntervalMs;
- }
-
- public void setSubscriptionLogManagerBaseIntervalMs(
- final long subscriptionLogManagerBaseIntervalMs) {
- this.subscriptionLogManagerBaseIntervalMs =
subscriptionLogManagerBaseIntervalMs;
- }
-
public boolean getSubscriptionPrefetchEnabled() {
return subscriptionPrefetchEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c6b7013b23e..b3613bb6677 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -776,16 +776,6 @@ public class CommonDescriptor {
properties.getProperty(
"subscription_max_allowed_event_count_in_tablet_batch",
String.valueOf(config.getSubscriptionMaxAllowedEventCountInTabletBatch()))));
- config.setSubscriptionLogManagerWindowSeconds(
- Long.parseLong(
- properties.getProperty(
- "subscription_log_manager_window_seconds",
-
String.valueOf(config.getSubscriptionLogManagerWindowSeconds()))));
- config.setSubscriptionLogManagerBaseIntervalMs(
- Long.parseLong(
- properties.getProperty(
- "subscription_log_manager_base_interval_ms",
-
String.valueOf(config.getSubscriptionLogManagerBaseIntervalMs()))));
config.setSubscriptionPrefetchEnabled(
Boolean.parseBoolean(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index a332b87f0ed..1ace6e71de8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -97,14 +97,6 @@ public class SubscriptionConfig {
return COMMON_CONFIG.getSubscriptionMaxAllowedEventCountInTabletBatch();
}
- public long getSubscriptionLogManagerWindowSeconds() {
- return COMMON_CONFIG.getSubscriptionLogManagerWindowSeconds();
- }
-
- public long getSubscriptionLogManagerBaseIntervalMs() {
- return COMMON_CONFIG.getSubscriptionLogManagerBaseIntervalMs();
- }
-
public boolean getSubscriptionPrefetchEnabled() {
return COMMON_CONFIG.getSubscriptionPrefetchEnabled();
}
@@ -179,10 +171,6 @@ public class SubscriptionConfig {
LOGGER.info(
"SubscriptionMaxAllowedEventCountInTabletBatch: {}",
getSubscriptionMaxAllowedEventCountInTabletBatch());
- LOGGER.info(
- "SubscriptionLogManagerWindowSeconds: {}",
getSubscriptionLogManagerWindowSeconds());
- LOGGER.info(
- "SubscriptionLogManagerBaseIntervalMs: {}",
getSubscriptionLogManagerBaseIntervalMs());
LOGGER.info("SubscriptionPrefetchEnabled: {}",
getSubscriptionPrefetchEnabled());
LOGGER.info(
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index bef96496650..4b25e08d1f6 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -52,11 +52,6 @@ struct TRegionMigrateResult {
4: required common.TRegionMaintainTaskStatus taskStatus
}
-struct TNotifyRegionMigrationReq {
- 1: required common.TConsensusGroupId regionId
- 2: required bool isStart
-}
-
struct TCreatePeerReq {
1: required common.TConsensusGroupId regionId
2: required list<common.TDataNodeLocation> regionLocations
@@ -786,11 +781,6 @@ service IDataNodeRPCService {
*/
TRegionMigrateResult getRegionMaintainResult(i64 taskId)
- /**
- * Notify the DataNode of the beginning or ending the migration of the
specified RegionGroup
- */
- common.TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req)
-
/**
* Config node will clean DataNode cache, the Data node will not accept
read/write request when disabled
* @param data node location
diff --git a/pom.xml b/pom.xml
index c5dd2933b25..8d8920a5dba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,13 +21,6 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <!--Add the JitPack repository because the moquette-broker dependency
needs to be resolved from the JitPack repository-->
- <repositories>
- <repository>
- <id>jitpack.io</id>
- <url>https://jitpack.io</url>
- </repository>
- </repositories>
<parent>
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
@@ -128,7 +121,7 @@
<mockito.version>2.23.4</mockito.version>
<!-- This was the last version to support Java 8 -->
<!--mockito.version>4.11.0</mockito.version-->
- <moquette.version>0.18.0</moquette.version>
+ <moquette.version>0.17</moquette.version>
<netty.version>4.1.115.Final</netty.version>
<nimbus-jose-jwt.version>9.37.3</nimbus-jose-jwt.version>
<oauth2-oidc-sdk.version>10.15</oauth2-oidc-sdk.version>
@@ -175,7 +168,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
- <tsfile.version>1.1.1-250324-SNAPSHOT</tsfile.version>
+ <tsfile.version>1.1.0-250219-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
@@ -439,7 +432,7 @@
<version>${reflections.version}</version>
</dependency>
<dependency>
- <groupId>com.github.moquette-io.moquette</groupId>
+ <groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>${moquette.version}</version>
</dependency>