This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5b15894 [Bug] Fix SchemaChangeJobV2's meta persist bug (#3804)
5b15894 is described below
commit 5b1589498a1c7722739eccaf2763c01fac6705eb
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Jun 9 21:55:46 2020 +0800
[Bug] Fix SchemaChangeJobV2's meta persist bug (#3804)
1. Missing field `partitionIndexMap` in SchemaChangeJobV2
2. Pair in field `indexSchemaVersionAndHashMap` can not be persisted by GSON
3. Exit the FE process when replay edit log error.
Fix: #3802
---
.../org/apache/doris/alter/SchemaChangeJobV2.java | 21 ++++----
.../apache/doris/common/SchemaVersionAndHash.java | 61 ++++++++++++++++++++++
.../java/org/apache/doris/persist/EditLog.java | 3 +-
.../apache/doris/alter/SchemaChangeJobV2Test.java | 14 ++++-
4 files changed, 87 insertions(+), 12 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 08838eb..bdede94 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -35,7 +35,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MarkedCountDownLatch;
-import org.apache.doris.common.Pair;
+import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
@@ -85,6 +85,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
@SerializedName(value = "partitionIndexTabletMap")
private Table<Long, Long, Map<Long, Long>> partitionIndexTabletMap =
HashBasedTable.create();
// partition id -> (shadow index id -> shadow index))
+ @SerializedName(value = "partitionIndexMap")
private Table<Long, Long, MaterializedIndex> partitionIndexMap =
HashBasedTable.create();
// shadow index id -> origin index id
@SerializedName(value = "indexIdMap")
@@ -97,7 +98,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
private Map<Long, List<Column>> indexSchemaMap = Maps.newHashMap();
// shadow index id -> (shadow index schema version : schema hash)
@SerializedName(value = "indexSchemaVersionAndHashMap")
- private Map<Long, Pair<Integer, Integer>> indexSchemaVersionAndHashMap =
Maps.newHashMap();
+ private Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap =
Maps.newHashMap();
// shadow index id -> shadow index short key count
@SerializedName(value = "indexShortKeyMap")
private Map<Long, Short> indexShortKeyMap = Maps.newHashMap();
@@ -151,7 +152,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
short shadowIdxShortKeyCount, List<Column> shadowIdxSchema) {
indexIdMap.put(shadowIdxId, originIdxId);
indexIdToName.put(shadowIdxId, shadowIndexName);
- indexSchemaVersionAndHashMap.put(shadowIdxId,
Pair.create(shadowSchemaVersion, shadowSchemaHash));
+ indexSchemaVersionAndHashMap.put(shadowIdxId, new
SchemaVersionAndHash(shadowSchemaVersion, shadowSchemaHash));
indexShortKeyMap.put(shadowIdxId, shadowIdxShortKeyCount);
indexSchemaMap.put(shadowIdxId, shadowIdxSchema);
}
@@ -234,7 +235,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
short shadowShortKeyColumnCount =
indexShortKeyMap.get(shadowIdxId);
List<Column> shadowSchema =
indexSchemaMap.get(shadowIdxId);
- int shadowSchemaHash =
indexSchemaVersionAndHashMap.get(shadowIdxId).second;
+ int shadowSchemaHash =
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int originSchemaHash =
tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
for (Tablet shadowTablet : shadowIdx.getTablets()) {
@@ -334,8 +335,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
for (long shadowIdxId : indexIdMap.keySet()) {
tbl.setIndexMeta(shadowIdxId, indexIdToName.get(shadowIdxId),
indexSchemaMap.get(shadowIdxId),
- indexSchemaVersionAndHashMap.get(shadowIdxId).first,
- indexSchemaVersionAndHashMap.get(shadowIdxId).second,
+
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion,
+ indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash,
indexShortKeyMap.get(shadowIdxId), TStorageType.COLUMN,
null);
}
@@ -385,7 +386,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
MaterializedIndex shadowIdx = entry.getValue();
long originIdxId = indexIdMap.get(shadowIdxId);
- int shadowSchemaHash =
indexSchemaVersionAndHashMap.get(shadowIdxId).second;
+ int shadowSchemaHash =
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int originSchemaHash =
tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
for (Tablet shadowTablet : shadowIdx.getTablets()) {
@@ -679,7 +680,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
TStorageMedium medium =
tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId,
partitionId, shadowIndexId,
-
indexSchemaVersionAndHashMap.get(shadowIndexId).second, medium);
+
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium);
for (Tablet shadownTablet : shadowIndex.getTablets()) {
invertedIndex.addTablet(shadownTablet.getId(),
shadowTabletMeta);
@@ -867,7 +868,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
int schemaVersion = in.readInt();
int schemaVersionHash = in.readInt();
- Pair<Integer, Integer> schemaVersionAndHash =
Pair.create(schemaVersion, schemaVersionHash);
+ SchemaVersionAndHash schemaVersionAndHash = new
SchemaVersionAndHash(schemaVersion, schemaVersionHash);
short shortKeyCount = in.readShort();
indexIdMap.put(shadowIndexId, originIndexId);
@@ -923,7 +924,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
String indexName = Text.readString(in);
int schemaVersion = in.readInt();
int schemaVersionHash = in.readInt();
- Pair<Integer, Integer> schemaVersionAndHash =
Pair.create(schemaVersion, schemaVersionHash);
+ SchemaVersionAndHash schemaVersionAndHash = new
SchemaVersionAndHash(schemaVersion, schemaVersionHash);
indexIdMap.put(shadowIndexId, originIndexId);
indexIdToName.put(shadowIndexId, indexName);
diff --git a/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java
b/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java
new file mode 100644
index 0000000..7932d52
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/*
+ * Currently just used for persisting schema version and schema hash pair
+ * using GSON
+ */
+public class SchemaVersionAndHash implements Writable {
+
+ @SerializedName(value = "version")
+ public int schemaVersion;
+ @SerializedName(value = "hash")
+ public int schemaHash;
+
+ public SchemaVersionAndHash(int schemaVersion, int schemaHash) {
+ this.schemaVersion = schemaVersion;
+ this.schemaHash = schemaHash;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ @Override
+ public String toString() {
+ return schemaVersion + ":" + schemaHash;
+ }
+
+ public static SchemaVersionAndHash read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, SchemaVersionAndHash.class);
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 3762095..fa3ef9e 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -29,9 +29,9 @@ import org.apache.doris.backup.RestoreJob;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSearchDesc;
+import org.apache.doris.catalog.Resource;
import org.apache.doris.cluster.BaseParam;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.Config;
@@ -779,6 +779,7 @@ public class EditLog {
}
} catch (Exception e) {
LOG.error("Operation Type {}", opCode, e);
+ System.exit(-1);
}
}
diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index edcf8ee..1eea8bb 100644
--- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -52,6 +52,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.meta.MetaContext;
@@ -62,6 +63,8 @@ import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.FakeTransactionIDGenerator;
import org.apache.doris.transaction.GlobalTransactionMgr;
+import com.google.common.collect.Maps;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -393,7 +396,9 @@ public class SchemaChangeJobV2Test {
SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1,1,
"test",600000);
schemaChangeJobV2.setStorageFormat(TStorageFormat.V2);
Deencapsulation.setField(schemaChangeJobV2, "jobState",
AlterJobV2.JobState.FINISHED);
-
+ Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap =
Maps.newHashMap();
+ indexSchemaVersionAndHashMap.put(Long.valueOf(1000), new
SchemaVersionAndHash(10, 20));
+ Deencapsulation.setField(schemaChangeJobV2,
"indexSchemaVersionAndHashMap", indexSchemaVersionAndHashMap);
// write schema change job
schemaChangeJobV2.write(out);
@@ -410,5 +415,12 @@ public class SchemaChangeJobV2Test {
Assert.assertEquals(1, result.getJobId());
Assert.assertEquals(AlterJobV2.JobState.FINISHED,
result.getJobState());
Assert.assertEquals(TStorageFormat.V2,
Deencapsulation.getField(result, "storageFormat"));
+
+ Assert.assertNotNull(Deencapsulation.getField(result,
"partitionIndexMap"));
+ Assert.assertNotNull(Deencapsulation.getField(result,
"partitionIndexTabletMap"));
+
+ Map<Long, SchemaVersionAndHash> map = Deencapsulation.getField(result,
"indexSchemaVersionAndHashMap");
+ Assert.assertEquals(10, map.get(1000L).schemaVersion);
+ Assert.assertEquals(20, map.get(1000L).schemaHash);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]