This is an automated email from the ASF dual-hosted git repository.
linlin pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 660b78d Fix incompatibility of BacklogQuota (#13291) (#13353)
660b78d is described below
commit 660b78d433c2b5b8a4c8f351f2e5499ae6512991
Author: feynmanlin <[email protected]>
AuthorDate: Thu Dec 16 19:35:13 2021 +0800
Fix incompatibility of BacklogQuota (#13291) (#13353)
* Fix incompatibility of BacklogQuota (#13291)
# Conflicts:
#
pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
#
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
* Fix jar mentioned in lib/presto/LICENSE, but not bundled
* Fix jar mentioned in lib/presto/LICENSE, but not bundled
Co-authored-by: ZhangJian He <[email protected]>
---
.../pulsar/common/policies/data/BacklogQuota.java | 9 +++
.../policies/data/impl/BacklogQuotaImpl.java | 77 ++++++++++++++++--
.../common/policies/data/BacklogQuotaMixIn.java | 26 ------
.../pulsar/common/util/ObjectMapperFactory.java | 2 -
.../common/util/ObjectMapperFactoryTest.java | 29 -------
.../metadata/BacklogQuotaCompatibilityTest.java | 94 +++++++++++++++++-----
pulsar-sql/presto-distribution/LICENSE | 2 -
7 files changed, 153 insertions(+), 86 deletions(-)
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
index d4b5c4b..4604710 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
@@ -30,6 +30,15 @@ public interface BacklogQuota {
/**
* Gets quota limit in size.
+ * Remains for compatible
+ *
+ * @return quota limit in bytes
+ */
+ @Deprecated
+ long getLimit();
+
+ /**
+ * Gets quota limit in size.
*
* @return quota limit in bytes
*/
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
index 79e0af2..41f475c 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
@@ -18,23 +18,86 @@
*/
package org.apache.pulsar.common.policies.data.impl;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import lombok.ToString;
import org.apache.pulsar.common.policies.data.BacklogQuota;
-@Data
-@AllArgsConstructor
+@ToString
+@EqualsAndHashCode
@NoArgsConstructor
public class BacklogQuotaImpl implements BacklogQuota {
public static final long BYTES_IN_GIGABYTE = 1024 * 1024 * 1024;
- // backlog quota by size in byte
- private long limitSize;
- // backlog quota by time in second
+ /**
+ * backlog quota by size in byte, remains for compatible.
+ * for the details: https://github.com/apache/pulsar/pull/13291
+ * @since 2.9.1
+ */
+ @Deprecated
+ private Long limit;
+
+ /**
+ * backlog quota by size in byte.
+ */
+ private Long limitSize;
+
+ /**
+ * backlog quota by time in second.
+ */
private int limitTime;
private RetentionPolicy policy;
+ public BacklogQuotaImpl(long limitSize, int limitTime, RetentionPolicy
policy) {
+ this.limitSize = limitSize;
+ this.limitTime = limitTime;
+ this.policy = policy;
+ }
+
+ @Deprecated
+ public long getLimit() {
+ if (limitSize == null) {
+ // the limitSize and limit can't be both null
+ return limit;
+ }
+ return limitSize;
+ }
+
+ @Deprecated
+ public void setLimit(long limit) {
+ this.limit = limit;
+ this.limitSize = limit;
+ }
+
+ public long getLimitSize() {
+ if (limitSize == null) {
+ // the limitSize and limit can't be both null
+ return limit;
+ }
+ return limitSize;
+ }
+
+ public void setLimitSize(long limitSize) {
+ this.limitSize = limitSize;
+ this.limit = limitSize;
+ }
+
+ public int getLimitTime() {
+ return limitTime;
+ }
+
+ public void setLimitTime(int limitTime) {
+ this.limitTime = limitTime;
+ }
+
+ public RetentionPolicy getPolicy() {
+ return policy;
+ }
+
+ public void setPolicy(RetentionPolicy policy) {
+ this.policy = policy;
+ }
+
public static BacklogQuotaImplBuilder builder() {
return new BacklogQuotaImplBuilder();
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java
deleted file mode 100644
index a156240..0000000
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.policies.data;
-
-import com.fasterxml.jackson.annotation.JsonAlias;
-
-public abstract class BacklogQuotaMixIn {
- @JsonAlias("limit")
- private long limitSize;
-}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index 94e1b7a..ef2e489 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -37,7 +37,6 @@ import
org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.BacklogQuotaMixIn;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesClusterInfo;
@@ -192,7 +191,6 @@ public class ObjectMapperFactory {
resolver.addMapping(AutoSubscriptionCreationOverride.class,
AutoSubscriptionCreationOverrideImpl.class);
// we use MixIn class to add jackson annotations
- mapper.addMixIn(BacklogQuotaImpl.class, BacklogQuotaMixIn.class);
mapper.addMixIn(ResourceQuota.class, ResourceQuotaMixIn.class);
mapper.addMixIn(FunctionConfig.class, JsonIgnorePropertiesMixIn.class);
mapper.addMixIn(FunctionState.class, JsonIgnorePropertiesMixIn.class);
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
index dc8aa8b..466585d 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
@@ -19,41 +19,12 @@
package org.apache.pulsar.common.util;
import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.ToString;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ResourceQuota;
-import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.stats.Metrics;
import org.testng.Assert;
import org.testng.annotations.Test;
public class ObjectMapperFactoryTest {
- @Test
- public void testBacklogQuotaMixIn() {
- ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
- String json =
"{\"limit\":10,\"limitTime\":0,\"policy\":\"producer_request_hold\"}";
- try {
- BacklogQuota backlogQuota = objectMapper.readValue(json,
BacklogQuota.class);
- Assert.assertEquals(backlogQuota.getLimitSize(), 10);
- Assert.assertEquals(backlogQuota.getLimitTime(), 0);
- Assert.assertEquals(backlogQuota.getPolicy(),
BacklogQuota.RetentionPolicy.producer_request_hold);
- } catch (Exception ex) {
- Assert.fail("shouldn't have thrown exception", ex);
- }
-
- try {
- String expectJson =
"{\"limitSize\":10,\"limitTime\":0,\"policy\":\"producer_request_hold\"}";
- BacklogQuota backlogQuota = BacklogQuota.builder()
- .limitSize(10)
- .limitTime(0)
-
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold)
- .build();
- String writeJson = objectMapper.writeValueAsString(backlogQuota);
- Assert.assertEquals(expectJson, writeJson);
- } catch (Exception ex) {
- Assert.fail("shouldn't have thrown exception", ex);
- }
- }
@Test
public void testResourceQuotaMixIn() {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
index a1333bc..e823d43 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
@@ -18,37 +18,91 @@
*/
package org.apache.pulsar.metadata;
+import static org.testng.Assert.assertEquals;
+
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
+import java.io.IOException;
+import java.util.HashMap;
+
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
+import org.testng.Assert;
import org.testng.annotations.Test;
-import java.io.IOException;
+public class BacklogQuotaCompatibilityTest {
-import static org.junit.Assert.assertEquals;
+ private final JavaType typeRef =
TypeFactory.defaultInstance().constructSimpleType(Policies.class, null);
-public class BacklogQuotaCompatibilityTest {
+ private final JSONMetadataSerdeSimpleType<Policies> simpleType = new
JSONMetadataSerdeSimpleType<>(typeRef);
+
+ private final BacklogQuota.RetentionPolicy testPolicy =
BacklogQuota.RetentionPolicy.consumer_backlog_eviction;
+
+ @Test
+ public void testV27ClientSetV28BrokerRead() throws Exception {
+ Policies writePolicy = new Policies();
+ BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+ writeBacklogQuota.setLimit(1024);
+ writeBacklogQuota.setLimitTime(60);
+ writeBacklogQuota.setPolicy(testPolicy);
+ HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap =
new HashMap<>();
+ quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage,
writeBacklogQuota);
+ writePolicy.backlog_quota_map = quotaHashMap;
+ byte[] serialize = simpleType.serialize(writePolicy);
+ Policies policies = simpleType.deserialize(serialize);
+ BacklogQuota readBacklogQuota =
policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+ Assert.assertEquals(readBacklogQuota.getLimitSize(), 1024);
+ Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
+ Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
+ }
+
+ @Test
+ public void testV28ClientSetV28BrokerRead() throws Exception {
+ Policies writePolicy = new Policies();
+ BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+ writeBacklogQuota.setLimitSize(1024);
+ writeBacklogQuota.setLimitTime(60);
+ writeBacklogQuota.setPolicy(testPolicy);
+ HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap =
new HashMap<>();
+ quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage,
writeBacklogQuota);
+ writePolicy.backlog_quota_map = quotaHashMap;
+ byte[] serialize = simpleType.serialize(writePolicy);
+ Policies policies = simpleType.deserialize(serialize);
+ BacklogQuota readBacklogQuota =
policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+ Assert.assertEquals(readBacklogQuota.getLimit(), 1024);
+ Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
+ Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
+ }
+
+ @Test
+ public void testV28ClientSetV27BrokerRead() {
+ BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+ writeBacklogQuota.setLimitSize(1024);
+ Assert.assertEquals(1024, writeBacklogQuota.getLimit());
+ }
@Test
public void testBackwardCompatibility() throws IOException {
- String oldPolicyStr =
"{\"auth_policies\":{\"namespace_auth\":{},\"destination_auth\":{}," +
-
"\"subscription_auth_roles\":{}},\"replication_clusters\":[],\"backlog_quota_map\":"
+
-
"{\"destination_storage\":{\"limit\":1001,\"policy\":\"consumer_backlog_eviction\"}},"
+
-
"\"clusterDispatchRate\":{},\"topicDispatchRate\":{},\"subscriptionDispatchRate\":{},"
+
-
"\"replicatorDispatchRate\":{},\"clusterSubscribeRate\":{},\"publishMaxMessageRate\":{},"
+
-
"\"latency_stats_sample_rate\":{},\"subscription_expiration_time_minutes\":0,\"deleted\":false,"
+
-
"\"encryption_required\":false,\"subscription_auth_mode\":\"None\"," +
-
"\"max_consumers_per_subscription\":0,\"offload_threshold\":-1," +
-
"\"schema_auto_update_compatibility_strategy\":\"Full\",\"schema_compatibility_strategy\":"
+
-
"\"UNDEFINED\",\"is_allow_auto_update_schema\":true,\"schema_validation_enforced\":false,"
+
- "\"subscription_types_enabled\":[]}\n";
-
- JSONMetadataSerdeSimpleType jsonMetadataSerdeSimpleType = new
JSONMetadataSerdeSimpleType(TypeFactory.defaultInstance().constructSimpleType(Policies.class,
null));
- Policies policies = (Policies)
jsonMetadataSerdeSimpleType.deserialize(oldPolicyStr.getBytes());
-
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
1001);
-
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),
0);
-
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getPolicy(),
BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+ String oldPolicyStr =
"{\"auth_policies\":{\"namespace_auth\":{},\"destination_auth\":{},"
+ +
"\"subscription_auth_roles\":{}},\"replication_clusters\":[],\"backlog_quota_map\":"
+ +
"{\"destination_storage\":{\"limit\":1001,\"policy\":\"consumer_backlog_eviction\"}},"
+ +
"\"clusterDispatchRate\":{},\"topicDispatchRate\":{},\"subscriptionDispatchRate\":{},"
+ +
"\"replicatorDispatchRate\":{},\"clusterSubscribeRate\":{},\"publishMaxMessageRate\":{},"
+ +
"\"latency_stats_sample_rate\":{},\"subscription_expiration_time_minutes\":0,\"deleted\":false,"
+ +
"\"encryption_required\":false,\"subscription_auth_mode\":\"None\","
+ +
"\"max_consumers_per_subscription\":0,\"offload_threshold\":-1,"
+ +
"\"schema_auto_update_compatibility_strategy\":\"Full\",\"schema_compatibility_strategy\":"
+ +
"\"UNDEFINED\",\"is_allow_auto_update_schema\":true,\"schema_validation_enforced\":false,"
+ + "\"subscription_types_enabled\":[]}\n";
+ Policies policies = simpleType.deserialize(oldPolicyStr.getBytes());
+
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
+ 1001);
+
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),
+ 0);
+
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getPolicy(),
+ BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
}
}
diff --git a/pulsar-sql/presto-distribution/LICENSE
b/pulsar-sql/presto-distribution/LICENSE
index ee58b53..b27c3ea 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -238,8 +238,6 @@ The Apache Software License, Version 2.0
- netty-codec-dns-4.1.72.Final.jar
- netty-codec-http-4.1.72.Final.jar
- netty-codec-haproxy-4.1.72.Final.jar
- - netty-codec-socks-4.1.72.Final.jar
- - netty-handler-proxy-4.1.72.Final.jar
- netty-common-4.1.72.Final.jar
- netty-handler-4.1.72.Final.jar
- netty-reactive-streams-2.0.4.jar