This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6ccdaf0aaf [fix](storage-policy) use Long instead of Date to persiste
cooldowntime in storage policy (#14532)
6ccdaf0aaf is described below
commit 6ccdaf0aafd8cb0c5ebe05ce61d5f1e9eb5b15c0
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Nov 24 08:32:21 2022 +0800
[fix](storage-policy) use Long instead of Date to persiste cooldowntime in
storage policy (#14532)
Previously, we use "Date" type for cooldownTime in StoragePolicy.
But the serialization method of Date type in Gson is different in java8 and
java11, which may cause inconsistent meta error.
This PR use Long to save cooldownTime.
And notice that in FE, the cooldownTime is saved in milliseconds, and in
BE, it is saved in seconds.
---
.../java/org/apache/doris/catalog/S3Resource.java | 7 +--
.../apache/doris/common/util/PropertyAnalyzer.java | 12 ++---
.../org/apache/doris/policy/StoragePolicy.java | 45 +++++++----------
.../apache/doris/service/FrontendServiceImpl.java | 10 ++--
.../doris/task/NotifyUpdateStoragePolicyTask.java | 16 +++---
.../doris/persist/StoragePolicyPersistTest.java | 58 ++++++++++++++++++++++
6 files changed, 95 insertions(+), 53 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
index aa303cfaaa..c21ca2c183 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
@@ -220,12 +220,7 @@ public class S3Resource extends Resource {
// add policy's coolDown ttl、coolDown data、policy name
to map
Map<String, String> tmpMap =
Maps.newHashMap(copiedProperties);
StoragePolicy used = (StoragePolicy) findPolicy.get();
- final String[] dateTimeToSecondTimestamp = {"-1"};
-
Optional.ofNullable(used.getCooldownDatetime()).ifPresent(date -> {
- long secondTimestamp = date.getTime() / 1000;
- dateTimeToSecondTimestamp[0] =
String.valueOf(secondTimestamp);
- });
- tmpMap.put(StoragePolicy.COOLDOWN_DATETIME,
dateTimeToSecondTimestamp[0]);
+ tmpMap.put(StoragePolicy.COOLDOWN_DATETIME,
String.valueOf(used.getCooldownTimestampMs()));
final String[] cooldownTtl = {"-1"};
Optional.ofNullable(used.getCooldownTtl())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 23a2e23808..ca872a5b3e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -216,15 +216,15 @@ public class PropertyAnalyzer {
StoragePolicy storagePolicy = (StoragePolicy) policy;
// check remote storage cool down timestamp
- if (storagePolicy.getCooldownDatetime() != null) {
- if (storagePolicy.getCooldownDatetime().getTime() <=
currentTimeMs) {
+ if (storagePolicy.getCooldownTimestampMs() != -1) {
+ if (storagePolicy.getCooldownTimestampMs() <= currentTimeMs) {
throw new AnalysisException("Remote storage cool down time
should later than now");
}
- if (hasCooldown &&
storagePolicy.getCooldownDatetime().getTime() <= cooldownTimeStamp) {
- throw new
AnalysisException("`remote_storage_cooldown_time`"
- + " should later than `storage_cooldown_time`.");
+ if (hasCooldown && storagePolicy.getCooldownTimestampMs() <=
cooldownTimeStamp) {
+ throw new AnalysisException(
+ "`remote_storage_cooldown_time`" + " should later
than `storage_cooldown_time`.");
}
- remoteCooldownTimeMs =
storagePolicy.getCooldownDatetime().getTime();
+ remoteCooldownTimeMs = storagePolicy.getCooldownTimestampMs();
} else if (storagePolicy.getCooldownTtl() != null &&
dataBaseTimeMs > 0) {
remoteCooldownTimeMs = dataBaseTimeMs +
storagePolicy.getCooldownTtlMs();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
index 653052cd59..5bbba5bd1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.system.SystemInfoService;
@@ -46,7 +47,6 @@ import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -106,8 +106,8 @@ public class StoragePolicy extends Policy {
@SerializedName(value = "storageResource")
private String storageResource = null;
- @SerializedName(value = "cooldownDatetime")
- private Date cooldownDatetime = null;
+ @SerializedName(value = "cooldownTimestampMs")
+ private long cooldownTimestampMs = -1;
@SerializedName(value = "cooldownTtl")
private String cooldownTtl = null;
@@ -127,15 +127,15 @@ public class StoragePolicy extends Policy {
* @param policyId policy id
* @param policyName policy name
* @param storageResource resource name for storage
- * @param cooldownDatetime cool down time
+ * @param cooldownTimestampMs cool down time
* @param cooldownTtl cool down time cost after partition is created
* @param cooldownTtlMs seconds for cooldownTtl
*/
public StoragePolicy(long policyId, final String policyName, final String
storageResource,
- final Date cooldownDatetime, final String cooldownTtl, long
cooldownTtlMs) {
+ final long cooldownTimestampMs, final String cooldownTtl, long
cooldownTtlMs) {
super(policyId, PolicyTypeEnum.STORAGE, policyName);
this.storageResource = storageResource;
- this.cooldownDatetime = cooldownDatetime;
+ this.cooldownTimestampMs = cooldownTimestampMs;
this.cooldownTtl = cooldownTtl;
this.cooldownTtlMs = cooldownTtlMs;
}
@@ -173,7 +173,7 @@ public class StoragePolicy extends Policy {
hasCooldownDatetime = true;
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
- this.cooldownDatetime = df.parse(props.get(COOLDOWN_DATETIME));
+ this.cooldownTimestampMs =
df.parse(props.get(COOLDOWN_DATETIME)).getTime();
} catch (ParseException e) {
throw new AnalysisException(String.format("cooldown_datetime
format error: %s",
props.get(COOLDOWN_DATETIME)), e);
@@ -232,13 +232,12 @@ public class StoragePolicy extends Policy {
props[0] = GsonUtils.GSON.toJson(copyMap);
});
}
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- if (cooldownDatetime == null) {
- return Lists.newArrayList(this.policyName, this.type.name(),
this.storageResource,
- "-1", this.cooldownTtl, props[0]);
+ if (cooldownTimestampMs == -1) {
+ return Lists.newArrayList(this.policyName, this.type.name(),
this.storageResource, "-1", this.cooldownTtl,
+ props[0]);
}
return Lists.newArrayList(this.policyName, this.type.name(),
this.storageResource,
- df.format(this.cooldownDatetime), this.cooldownTtl, props[0]);
+ TimeUtils.longToTimeString(this.cooldownTimestampMs),
this.cooldownTtl, props[0]);
}
@Override
@@ -246,7 +245,7 @@ public class StoragePolicy extends Policy {
@Override
public StoragePolicy clone() {
- return new StoragePolicy(this.policyId, this.policyName,
this.storageResource, this.cooldownDatetime,
+ return new StoragePolicy(this.policyId, this.policyName,
this.storageResource, this.cooldownTimestampMs,
this.cooldownTtl, this.cooldownTtlMs);
}
@@ -324,10 +323,7 @@ public class StoragePolicy extends Policy {
Map<String, String> copiedStoragePolicyProperties =
Env.getCurrentEnv().getResourceMgr()
.getResource(this.storageResource).getCopiedProperties();
- final String[] dateTimeToSecondTimestamp = {"-1"};
- Optional.ofNullable(this.cooldownDatetime).ifPresent(
- date -> dateTimeToSecondTimestamp[0] =
String.valueOf(this.cooldownDatetime.getTime() / 1000));
- copiedStoragePolicyProperties.put(COOLDOWN_DATETIME,
dateTimeToSecondTimestamp[0]);
+ copiedStoragePolicyProperties.put(COOLDOWN_DATETIME,
String.valueOf(this.cooldownTimestampMs));
copiedStoragePolicyProperties.put(COOLDOWN_TTL, this.cooldownTtl);
LOG.info("calcPropertiesMd5 map {}", copiedStoragePolicyProperties);
@@ -355,7 +351,7 @@ public class StoragePolicy extends Policy {
Optional.ofNullable(properties.get(COOLDOWN_DATETIME)).ifPresent(date
-> {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
-
this.setCooldownDatetime(df.parse(properties.get(COOLDOWN_DATETIME)));
+ this.cooldownTimestampMs =
df.parse(properties.get(COOLDOWN_DATETIME)).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
@@ -384,22 +380,17 @@ public class StoragePolicy extends Policy {
Map<String, String> tmpMap = Maps.newHashMap(copiedProperties);
- final String[] dateTimeToSecondTimestamp = {"-1"};
- Optional.ofNullable(this.cooldownDatetime).ifPresent(
- date -> dateTimeToSecondTimestamp[0] =
String.valueOf(this.cooldownDatetime.getTime() / 1000)
- );
- tmpMap.put(COOLDOWN_DATETIME, dateTimeToSecondTimestamp[0]);
+ tmpMap.put(COOLDOWN_DATETIME,
String.valueOf(this.cooldownTimestampMs));
Optional.ofNullable(this.getCooldownTtl()).ifPresent(date -> {
tmpMap.put(COOLDOWN_TTL, this.getCooldownTtl());
});
tmpMap.put(MD5_CHECKSUM, this.getMd5Checksum());
- NotifyUpdateStoragePolicyTask notifyUpdateStoragePolicyTask
- = new NotifyUpdateStoragePolicyTask(beId, getPolicyName(),
tmpMap);
+ NotifyUpdateStoragePolicyTask notifyUpdateStoragePolicyTask = new
NotifyUpdateStoragePolicyTask(beId,
+ getPolicyName(), tmpMap);
batchTask.addTask(notifyUpdateStoragePolicyTask);
LOG.info("update policy info to be: {}, policy name: {}, "
- + "properties: {} to modify S3 resource batch task.",
- beId, getPolicyName(), tmpMap);
+ + "properties: {} to modify S3 resource batch task.",
beId, getPolicyName(), tmpMap);
}
AgentTaskExecutor.submit(batchTask);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index a7a128b567..ed40966bf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1151,17 +1151,13 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
rEntry.setPolicyName(iter.getPolicyName());
//java 8 not support ifPresentOrElse
final long[] ttlCoolDown = {-1};
- Optional.ofNullable(iter.getCooldownTtl())
- .ifPresent(ttl -> ttlCoolDown[0] =
Integer.parseInt(ttl));
+ Optional.ofNullable(iter.getCooldownTtl()).ifPresent(ttl
-> ttlCoolDown[0] = Integer.parseInt(ttl));
rEntry.setCooldownTtl(ttlCoolDown[0]);
- final long[] secondTimestamp = {-1};
- Optional.ofNullable(iter.getCooldownDatetime())
- .ifPresent(date -> secondTimestamp[0] = date.getTime()
/ 1000);
- rEntry.setCooldownDatetime(secondTimestamp[0]);
+ rEntry.setCooldownDatetime(
+ iter.getCooldownTimestampMs() == -1 ? -1 :
iter.getCooldownTimestampMs() / 100);
Optional.ofNullable(iter.getMd5Checksum()).ifPresent(rEntry::setMd5Checksum);
-
TS3StorageParam s3Info = new TS3StorageParam();
Optional.ofNullable(iter.getStorageResource()).ifPresent(resource -> {
Map<String, String> storagePolicyProperties =
Env.getCurrentEnv().getResourceMgr()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/NotifyUpdateStoragePolicyTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/NotifyUpdateStoragePolicyTask.java
index c2d0cc2ac5..2e29a64b44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/task/NotifyUpdateStoragePolicyTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/task/NotifyUpdateStoragePolicyTask.java
@@ -43,15 +43,17 @@ public class NotifyUpdateStoragePolicyTask extends
AgentTask {
TGetStoragePolicy ret = new TGetStoragePolicy();
ret.policy_name = policyName;
- ret.cooldown_datetime =
Long.parseLong(properties.get(StoragePolicy.COOLDOWN_DATETIME));
+ // cooldown_datetime in BE is in seconds
+ ret.cooldown_datetime =
Long.parseLong(properties.get(StoragePolicy.COOLDOWN_DATETIME)) / 1000;
ret.cooldown_ttl =
Long.parseLong(properties.get(StoragePolicy.COOLDOWN_TTL));
ret.s3_storage_param = new TS3StorageParam();
- ret.s3_storage_param.s3_max_conn =
Integer.parseInt(properties.getOrDefault(S3Resource.S3_MAX_CONNECTIONS,
- S3Resource.DEFAULT_S3_MAX_CONNECTIONS));
- ret.s3_storage_param.s3_request_timeout_ms =
Integer.parseInt(properties.getOrDefault(
- S3Resource.S3_REQUEST_TIMEOUT_MS,
S3Resource.DEFAULT_S3_REQUEST_TIMEOUT_MS));
- ret.s3_storage_param.s3_conn_timeout_ms =
Integer.parseInt(properties.getOrDefault(
- S3Resource.S3_CONNECTION_TIMEOUT_MS,
S3Resource.DEFAULT_S3_CONNECTION_TIMEOUT_MS));
+ ret.s3_storage_param.s3_max_conn = Integer.parseInt(
+ properties.getOrDefault(S3Resource.S3_MAX_CONNECTIONS,
S3Resource.DEFAULT_S3_MAX_CONNECTIONS));
+ ret.s3_storage_param.s3_request_timeout_ms = Integer.parseInt(
+ properties.getOrDefault(S3Resource.S3_REQUEST_TIMEOUT_MS,
S3Resource.DEFAULT_S3_REQUEST_TIMEOUT_MS));
+ ret.s3_storage_param.s3_conn_timeout_ms = Integer.parseInt(
+ properties.getOrDefault(S3Resource.S3_CONNECTION_TIMEOUT_MS,
+ S3Resource.DEFAULT_S3_CONNECTION_TIMEOUT_MS));
ret.s3_storage_param.s3_endpoint =
properties.get(S3Resource.S3_ENDPOINT);
ret.s3_storage_param.s3_region = properties.get(S3Resource.S3_REGION);
ret.s3_storage_param.root_path =
properties.get(S3Resource.S3_ROOT_PATH);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/StoragePolicyPersistTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/StoragePolicyPersistTest.java
new file mode 100644
index 0000000000..29791f1bcc
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/StoragePolicyPersistTest.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.policy.StoragePolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class StoragePolicyPersistTest {
+ @Test
+ public void test() throws IOException {
+ long cooldownTime = System.currentTimeMillis();
+ StoragePolicy storagePolicy = new StoragePolicy(1, "test_policy",
"resource1", cooldownTime, "-1", -1);
+
+ // 1. Write objects to file
+ File file = new File("./StoregaPolicyPersistTest");
+ file.createNewFile();
+ DataOutputStream dos = new DataOutputStream(new
FileOutputStream(file));
+ storagePolicy.write(dos);
+ dos.flush();
+ dos.close();
+
+ // 2. Read objects from file
+ DataInputStream dis = new DataInputStream(new FileInputStream(file));
+ StoragePolicy anotherStoragePolicy = StoragePolicy.read(dis);
+ Assert.assertEquals(cooldownTime,
anotherStoragePolicy.getCooldownTimestampMs());
+
+ StoragePolicy clonePolicy = anotherStoragePolicy.clone();
+ Assert.assertEquals(cooldownTime,
clonePolicy.getCooldownTimestampMs());
+
+ // 3. delete files
+ dis.close();
+ file.delete();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]