This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch jira_1376_0.12 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 47c1527de1e3e41706877c67e3c5b2e2a3502b83 Author: jt2594838 <[email protected]> AuthorDate: Wed May 12 22:43:48 2021 +0800 properly handle BatchProcessException --- .../iotdb/cluster/log/applier/BaseApplier.java | 37 ++++++++++++- .../cluster/log/applier/DataLogApplierTest.java | 60 +++++++++++++++++----- .../engine/storagegroup/StorageGroupProcessor.java | 8 ++- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 7 +-- .../org/apache/iotdb/db/qp/physical/BatchPlan.java | 47 +++++++++++++++++ .../db/qp/physical/crud/InsertMultiTabletPlan.java | 39 +++++++++++++- .../physical/crud/InsertRowsOfOneDevicePlan.java | 33 +++++++++++- .../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 39 +++++++++++++- .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 39 +++++++++++++- 9 files changed, 284 insertions(+), 25 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java index 460ba6e..a00c9c6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java @@ -34,10 +34,13 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.executor.PlanExecutor; +import org.apache.iotdb.db.qp.physical.BatchPlan; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.SchemaUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,11 +74,11 @@ abstract class BaseApplier implements LogApplier { } else if (plan != null && !plan.isQuery()) { try { getQueryExecutor().processNonQuery(plan); + } catch (BatchProcessException e) { + handleBatchProcessException(e, plan); } catch (QueryProcessException e) { if (e.getCause() instanceof StorageGroupNotSetException) { executeAfterSync(plan); - } else if (e instanceof BatchProcessException) { - logger.warn("Exception occurred while processing non-query. ", e); } else { throw e; } @@ -87,6 +90,36 @@ abstract class BaseApplier implements LogApplier { } } + private void handleBatchProcessException(BatchProcessException e, PhysicalPlan plan) + throws QueryProcessException, StorageEngineException, StorageGroupNotSetException { + TSStatus[] failingStatus = e.getFailingStatus(); + for (int i = 0; i < failingStatus.length; i++) { + TSStatus status = failingStatus[i]; + // skip succeeded plans in later execution + if (status != null + && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + && plan instanceof BatchPlan) { + ((BatchPlan) plan).setIsExecuted(i); + } + } + boolean needRetry = false; + for (int i = 0, failingStatusLength = failingStatus.length; i < failingStatusLength; i++) { + TSStatus status = failingStatus[i]; + if (status != null + && (status.getCode() == TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode()) + && plan instanceof BatchPlan) { + ((BatchPlan) plan).unsetIsExecuted(i); + needRetry = true; + } + } + if (needRetry) { + executeAfterSync(plan); + return; + } + throw e; + } + private void executeAfterSync(PhysicalPlan plan) throws QueryProcessException, StorageGroupNotSetException, StorageEngineException { try { diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java index af6272e..cfee1bb 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java @@ -19,6 +19,19 @@ package org.apache.iotdb.cluster.log.applier; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import junit.framework.TestCase; import org.apache.iotdb.cluster.client.DataClientProvider; import org.apache.iotdb.cluster.client.async.AsyncDataClient; import org.apache.iotdb.cluster.common.IoTDBTest; @@ -58,41 +71,39 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; - -import junit.framework.TestCase; import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.protocol.TBinaryProtocol.Factory; import org.junit.After; import org.junit.Before; import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DataLogApplierTest extends IoTDBTest { + private static final Logger logger = LoggerFactory.getLogger(DataLogApplierTest.class); private boolean partialWriteEnabled; private TestMetaGroupMember testMetaGroupMember = new TestMetaGroupMember() { @Override public boolean syncLeader(RaftMember.CheckConsistency checkConsistency) { + try { + // for testApplyCreateMultiTimeseiresWithPulling() + IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg2")); + } catch (MetadataException e) { + logger.error("Cannot set sg for test", e); + } return true; } @@ -338,4 +349,25 @@ public class DataLogApplierTest extends IoTDBTest { "Storage group is not set for current seriesPath: [root.test20]", log.getException().getMessage()); } + + @Test + public void testApplyCreateMultiTimeseiresWithPulling() throws MetadataException { + IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg1")); + CreateMultiTimeSeriesPlan multiTimeSeriesPlan = new CreateMultiTimeSeriesPlan(); + multiTimeSeriesPlan.setPaths( + Arrays.asList( + new PartialPath("root.sg1.s1"), + // root.sg2 should be pulled + new PartialPath("root.sg2.s1"))); + multiTimeSeriesPlan.setCompressors( + Arrays.asList(CompressionType.UNCOMPRESSED, CompressionType.UNCOMPRESSED)); + multiTimeSeriesPlan.setDataTypes(Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE)); + multiTimeSeriesPlan.setEncodings(Arrays.asList(TSEncoding.GORILLA, TSEncoding.GORILLA)); + + PhysicalPlanLog log = new PhysicalPlanLog(multiTimeSeriesPlan); + // the applier should sync meta leader to get root.sg2 and report no error + applier.apply(log); + assertTrue(IoTDB.metaManager.getAllStorageGroupPaths().contains(new PartialPath("root.sg2"))); + assertNull(log.getException()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 33bc6c3..ed8e52e 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -2859,9 +2859,13 @@ public class StorageGroupProcessor { writeLock(); try { boolean isSequence = false; - for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) { - if (!isAlive(plan.getTime())) { + InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans(); + for (int i = 0, rowPlansLength = rowPlans.length; i < rowPlansLength; i++) { + + InsertRowPlan plan = rowPlans[i]; + if (!isAlive(plan.getTime()) || insertRowsOfOneDevicePlan.isExecuted(i)) { // we do not need to write these part of data, as they can not be queried + // or the sub-plan has already been executed, we are retrying other sub-plans continue; } // init map diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 55f9ae4..c3156de 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -1209,7 +1209,7 @@ public class PlanExecutor implements IPlanExecutor { @Override public void insert(InsertRowsPlan plan) throws QueryProcessException { for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) { - if (plan.getResults().containsKey(i)) { + if (plan.getResults().containsKey(i) || plan.isExecuted(i)) { continue; } try { @@ -1253,7 +1253,8 @@ public class PlanExecutor implements IPlanExecutor { public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan) throws QueryProcessException { for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) { - if (insertMultiTabletPlan.getResults().containsKey(i)) { + if (insertMultiTabletPlan.getResults().containsKey(i) + || insertMultiTabletPlan.isExecuted(i)) { continue; } insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i)); @@ -1369,7 +1370,7 @@ public class PlanExecutor implements IPlanExecutor { private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan) throws BatchProcessException { for (int i = 0; i < multiPlan.getPaths().size(); i++) { - if (multiPlan.getResults().containsKey(i)) { + if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) { continue; } CreateTimeSeriesPlan plan = diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java new file mode 100644 index 0000000..4319c82 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.qp.physical; + +/** BatchPlan contains multiple sub-plans. */ +public interface BatchPlan { + + /** + * Mark the sub-plan at position i as executed. + * + * @param i + */ + void setIsExecuted(int i); + + /** + * Mark the sub-plan at position i as not executed. + * + * @param i + */ + void unsetIsExecuted(int i); + + /** + * @return whether the sub-plan at position i has been executed. + * @param i + */ + boolean isExecuted(int i); + + /** @return how many sub-plans are in the plan. */ + int getBatchSize(); +} diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java index ba552b0..1883823 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; +import org.apache.iotdb.db.qp.physical.BatchPlan; import org.apache.iotdb.service.rpc.thrift.TSStatus; import java.io.DataOutputStream; @@ -39,7 +40,7 @@ import java.util.TreeMap; * reduce the number of raft logs. For details, please refer to * https://issues.apache.org/jira/browse/IOTDB-1099 */ -public class InsertMultiTabletPlan extends InsertPlan { +public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan { /** * the value is used to indict the parent InsertTabletPlan's index when the parent @@ -90,6 +91,8 @@ public class InsertMultiTabletPlan extends InsertPlan { /** record the result of creation of time series */ private Map<Integer, TSStatus> results = new TreeMap<>(); + boolean[] isExecuted; + public InsertMultiTabletPlan() { super(OperatorType.MULTI_BATCH_INSERT); this.insertTabletPlanList = new ArrayList<>(); @@ -327,4 +330,38 @@ public class InsertMultiTabletPlan extends InsertPlan { : 0); return result; } + + @Override + public void setIsExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + isExecuted[i] = true; + } + + @Override + public boolean isExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + return isExecuted[i]; + } + + @Override + public int getBatchSize() { + return insertTabletPlanList.size(); + } + + @Override + public void unsetIsExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + isExecuted[i] = false; + if (parentInsertTabletPlanIndexList != null) { + results.remove(getParentIndex(i)); + } else { + results.remove(i); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java index 41509bd..8886688 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; +import org.apache.iotdb.db.qp.physical.BatchPlan; import java.io.DataOutputStream; import java.io.IOException; @@ -31,8 +32,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -public class InsertRowsOfOneDevicePlan extends InsertPlan { +public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan { + boolean[] isExecuted; private InsertRowPlan[] rowPlans; public InsertRowsOfOneDevicePlan( @@ -163,4 +165,33 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan { public InsertRowPlan[] getRowPlans() { return rowPlans; } + + @Override + public void setIsExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + isExecuted[i] = true; + } + + @Override + public boolean isExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + return isExecuted[i]; + } + + @Override + public int getBatchSize() { + return rowPlans.length; + } + + @Override + public void unsetIsExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + isExecuted[i] = false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java index d5122cf..8bcfe32 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; +import org.apache.iotdb.db.qp.physical.BatchPlan; import org.apache.iotdb.db.utils.StatusUtils; import org.apache.iotdb.service.rpc.thrift.TSStatus; @@ -34,7 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class InsertRowsPlan extends InsertPlan { +public class InsertRowsPlan extends InsertPlan implements BatchPlan { /** * Suppose there is an InsertRowsPlan, which contains 5 InsertRowPlans, @@ -51,6 +52,8 @@ public class InsertRowsPlan extends InsertPlan { /** the InsertRowsPlan list */ private List<InsertRowPlan> insertRowPlanList; + boolean[] isExecuted; + /** record the result of insert rows */ private Map<Integer, TSStatus> results = new HashMap<>(); @@ -234,4 +237,38 @@ public class InsertRowsPlan extends InsertPlan { public TSStatus[] getFailingStatus() { return StatusUtils.getFailingStatus(results, insertRowPlanList.size()); } + + @Override + public void setIsExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + isExecuted[i] = true; + } + + @Override + public boolean isExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + return isExecuted[i]; + } + + @Override + public int getBatchSize() { + return insertRowPlanList.size(); + } + + @Override + public void unsetIsExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + isExecuted[i] = false; + if (insertRowPlanIndexList != null) { + results.remove(insertRowPlanIndexList.get(i)); + } else { + results.remove(i); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java index 4c0e854..d710afb 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; +import org.apache.iotdb.db.qp.physical.BatchPlan; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.utils.StatusUtils; import org.apache.iotdb.service.rpc.thrift.TSStatus; @@ -43,7 +44,7 @@ import java.util.TreeMap; /** * create multiple timeSeries, could be split to several sub Plans to execute in different DataGroup */ -public class CreateMultiTimeSeriesPlan extends PhysicalPlan { +public class CreateMultiTimeSeriesPlan extends PhysicalPlan implements BatchPlan { private List<PartialPath> paths; private List<TSDataType> dataTypes; @@ -54,6 +55,8 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan { private List<Map<String, String>> tags = null; private List<Map<String, String>> attributes = null; + boolean[] isExecuted; + /** record the result of creation of time series */ private Map<Integer, TSStatus> results = new TreeMap<>(); @@ -343,4 +346,38 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan { } } } + + @Override + public void setIsExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + isExecuted[i] = true; + } + + @Override + public boolean isExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + return isExecuted[i]; + } + + @Override + public int getBatchSize() { + return paths.size(); + } + + @Override + public void unsetIsExecuted(int i) { + if (isExecuted == null) { + isExecuted = new boolean[getBatchSize()]; + } + isExecuted[i] = false; + if (indexes != null) { + results.remove(indexes.get(i)); + } else { + results.remove(i); + } + } }
