This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9c0937b Allow to configure TypedMessageBuilder through a Map conf
object (#4015)
9c0937b is described below
commit 9c0937b85da38d25d6b0dbbcc2a58b0178dbf09f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 10 22:17:16 2019 -0700
Allow to configure TypedMessageBuilder through a Map conf object (#4015)
* Allow to configure TypedMessageBuilder through a Map conf object
* Use constants for message confs
* Reverted previous change
* Use Long instead of Number
---
.../api/SimpleTypedProducerConsumerTest.java | 80 ++++++++++++++++++++++
.../pulsar/client/api/TypedMessageBuilder.java | 42 ++++++++++++
.../client/impl/TypedMessageBuilderImpl.java | 27 ++++++++
.../apache/pulsar/client/util/TypeCheckUtil.java | 33 +++++++++
4 files changed, 182 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 050db39..586f72b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -18,15 +18,23 @@
*/
package org.apache.pulsar.client.api;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
import java.time.Clock;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -623,4 +631,76 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test
+ public void testMessageBuilderLoadConf() throws Exception {
+ String topic = "my-topic-" + System.nanoTime();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("my-subscriber-name")
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("a", "1");
+ properties.put("b", "2");
+
+ Map<String, Object> msgConf = new HashMap<>();
+ msgConf.put("key", "key-1");
+ msgConf.put("properties", properties);
+ msgConf.put("eventTime", 1234L);
+ msgConf.put("sequenceId", 5L);
+ msgConf.put("replicationClusters", Lists.newArrayList("a", "b", "c"));
+ msgConf.put("disableReplication", false);
+
+ producer.newMessage()
+ .value("my-message")
+ .loadConf(msgConf)
+ .send();
+
+
+ Message<String> msg = consumer.receive();
+ assertEquals(msg.getKey(), "key-1");
+ assertEquals(msg.getProperties().get("a"), "1");
+ assertEquals(msg.getProperties().get("b"), "2");
+ assertEquals(msg.getEventTime(), 1234);
+ assertEquals(msg.getSequenceId(), 5);
+
+ consumer.acknowledge(msg);
+
+ // Try with invalid confs
+ msgConf.clear();
+ msgConf.put("nonExistingKey", "key-1");
+
+ try {
+ producer.newMessage()
+ .value("my-message")
+ .loadConf(msgConf)
+ .send();
+ fail("Should have failed");
+ } catch (RuntimeException e) {
+ // expected
+ }
+
+ // Try with invalid type
+ msgConf.clear();
+ msgConf.put("eventTime", "hello");
+
+ try {
+ producer.newMessage()
+ .value("my-message")
+ .loadConf(msgConf)
+ .send();
+ fail("Should have failed");
+ } catch (RuntimeException e) {
+ // expected
+ }
+ }
+
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
index 423c080..a1e2f2d 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
@@ -174,4 +174,46 @@ public interface TypedMessageBuilder<T> extends
Serializable {
* @return the message builder instance
*/
TypedMessageBuilder<T> disableReplication();
+
+ /**
+ * Configure the {@link TypedMessageBuilder} from a config map, as an
alternative compared
+ * to call the individual builder methods.
+ * <p>
+ * The "value" of the message itself cannot be set on the config map.
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ * Map<String, Object> conf = new HashMap<>();
+ * conf.put("key", "my-key");
+ * conf.put("eventTime", System.currentTimeMillis());
+ *
+ * producer.newMessage()
+ * .value("my-message")
+ * .loadConf(conf)
+ * .send();
+ * }</pre>
+ *
+ * The available options are:
+ * <table border="1">
+ * <tr><th>Constant</th><th>Name</th><th>Type</th><th>Doc</th></tr>
+ * <tr><td>{@link #CONF_KEY}</td><td>{@code key}</td><td>{@code
String}</td><td>{@link #key(String)}</td></tr>
+ * <tr><td>{@link #CONF_PROPERTIES}</td><td>{@code
properties}</td><td>{@code Map<String,String>}</td><td>{@link
#properties(Map)}</td></tr>
+ * <tr><td>{@link #CONF_EVENT_TIME}</td><td>{@code
eventTime}</td><td>{@code long}</td><td>{@link #eventTime(long)}</td></tr>
+ * <tr><td>{@link #CONF_SEQUENCE_ID}</td><td>{@code
sequenceId}</td><td>{@code long}</td><td>{@link #sequenceId(long)}</td></tr>
+ * <tr><td>{@link #CONF_REPLICATION_CLUSTERS}</td><td>{@code
replicationClusters}</td><td>{@code List<String>}</td><td>{@link
#replicationClusters(List)}</td></tr>
+ * <tr><td>{@link #CONF_DISABLE_REPLICATION}</td><td>{@code
disableReplication}</td><td>{@code boolean}</td><td>{@link
#disableReplication()}</td></tr>
+ * </table>
+ *
+ * @param config a map with the configuration options for the message
+ * @return the message builder instance
+ */
+ TypedMessageBuilder<T> loadConf(Map<String, Object> config);
+
+ static final String CONF_KEY = "key";
+ static final String CONF_PROPERTIES = "properties";
+ static final String CONF_EVENT_TIME = "eventTime";
+ static final String CONF_SEQUENCE_ID = "sequenceId";
+ static final String CONF_REPLICATION_CLUSTERS = "replicationClusters";
+ static final String CONF_DISABLE_REPLICATION = "disableReplication";
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index b13423e..fb18c0e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.client.util.TypeCheckUtil.checkType;
import com.google.common.base.Preconditions;
@@ -130,6 +131,32 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
return this;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public TypedMessageBuilder<T> loadConf(Map<String, Object> config) {
+ config.forEach((key, value) -> {
+ if (key.equals(CONF_KEY)) {
+ this.key(checkType(value, String.class));
+ } else if (key.equals(CONF_PROPERTIES)) {
+ this.properties(checkType(value, Map.class));
+ } else if (key.equals(CONF_EVENT_TIME)) {
+ this.eventTime(checkType(value, Long.class));
+ } else if (key.equals(CONF_SEQUENCE_ID)) {
+ this.sequenceId(checkType(value, Long.class));
+ } else if (key.equals(CONF_REPLICATION_CLUSTERS)) {
+ this.replicationClusters(checkType(value, List.class));
+ } else if (key.equals(CONF_DISABLE_REPLICATION)) {
+ boolean disableReplication = checkType(value, Boolean.class);
+ if (disableReplication) {
+ this.disableReplication();
+ }
+ } else {
+ throw new RuntimeException("Invalid message config key '" +
key + "'");
+ }
+ });
+ return this;
+ }
+
public MessageMetadata.Builder getMetadataBuilder() {
return msgMetadataBuilder;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java
new file mode 100644
index 0000000..cbabdfe
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class TypeCheckUtil {
+ @SuppressWarnings("unchecked")
+ public static <T> T checkType(Object o, Class<T> clazz) {
+ if (!clazz.isInstance(o)) {
+ throw new RuntimeException(
+ String.format("Invalid object type '%s' when exepcting
'%s'", o.getClass().getName(), clazz.getName()));
+ }
+ return (T) o;
+ }
+}