This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.14.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git

commit 020a3d214734398887ba908db692581d8a7729f2
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Apr 19 13:19:45 2019 -0700

    Adds backwards-compatible serde for SeekableStreamStartSequenceNumbers. 
(#7512)
    
    This allows them to be deserialized by older Druid versions as
    KafkaPartitions objects.
    
    Fixes #7470.
---
 .../SeekableStreamEndSequenceNumbers.java          |  4 +-
 .../SeekableStreamStartSequenceNumbers.java        | 44 +++++++++++--
 .../SeekableStreamStartSequenceNumbersTest.java    | 77 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 7 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
index 3f87b27..7a4d3fa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
@@ -105,7 +105,7 @@ public class 
SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
 
   /**
    * Identical to {@link #getStream()}. Here for backwards compatibility, so a 
serialized
-   * SeekableStreamStartSequenceNumbers can be read by older Druid versions as 
a KafkaPartitions object.
+   * SeekableStreamEndSequenceNumbers can be read by older Druid versions as a 
KafkaPartitions object.
    */
   @JsonProperty
   public String getTopic()
@@ -182,7 +182,7 @@ public class 
SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
 
   /**
    * Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for 
backwards compatibility, so a serialized
-   * SeekableStreamStartSequenceNumbers can be read by older Druid versions as 
a KafkaPartitions object.
+   * SeekableStreamEndSequenceNumbers can be read by older Druid versions as a 
KafkaPartitions object.
    */
   @JsonProperty
   public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
index f737292..9a25771 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
@@ -50,16 +50,21 @@ public class 
SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
   @JsonCreator
   public SeekableStreamStartSequenceNumbers(
       @JsonProperty("stream") final String stream,
+      // kept for backward compatibility
+      @JsonProperty("topic") final String topic,
       @JsonProperty("partitionSequenceNumberMap")
       final Map<PartitionIdType, SequenceOffsetType> 
partitionSequenceNumberMap,
+      // kept for backward compatibility
+      @JsonProperty("partitionOffsetMap") final Map<PartitionIdType, 
SequenceOffsetType> partitionOffsetMap,
       @JsonProperty("exclusivePartitions") @Nullable final 
Set<PartitionIdType> exclusivePartitions
   )
   {
-    this.stream = Preconditions.checkNotNull(stream, "stream");
-    this.partitionSequenceNumberMap = Preconditions.checkNotNull(
-        partitionSequenceNumberMap,
-        "partitionIdToSequenceNumberMap"
-    );
+    this.stream = stream == null ? topic : stream;
+    this.partitionSequenceNumberMap = partitionOffsetMap == null ? 
partitionSequenceNumberMap : partitionOffsetMap;
+
+    Preconditions.checkNotNull(this.stream, "stream");
+    Preconditions.checkNotNull(this.partitionSequenceNumberMap, 
"partitionIdToSequenceNumberMap");
+
     // exclusiveOffset can be null if this class is deserialized from metadata 
store. Note that only end offsets are
     // stored in metadata store.
     // The default is true because there was only Kafka indexing service 
before in which the end offset is always
@@ -67,6 +72,15 @@ public class 
SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
     this.exclusivePartitions = exclusivePartitions == null ? 
Collections.emptySet() : exclusivePartitions;
   }
 
+  public SeekableStreamStartSequenceNumbers(
+      String stream,
+      Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
+      Set<PartitionIdType> exclusivePartitions
+  )
+  {
+    this(stream, null, partitionSequenceNumberMap, null, exclusivePartitions);
+  }
+
   @Override
   @JsonProperty
   public String getStream()
@@ -74,6 +88,16 @@ public class 
SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
     return stream;
   }
 
+  /**
+   * Identical to {@link #getStream()}. Here for backwards compatibility, so a 
serialized
+   * SeekableStreamStartSequenceNumbers can be read by older Druid versions as 
a KafkaPartitions object.
+   */
+  @JsonProperty
+  public String getTopic()
+  {
+    return stream;
+  }
+
   @Override
   @JsonProperty
   public Map<PartitionIdType, SequenceOffsetType> 
getPartitionSequenceNumberMap()
@@ -81,6 +105,16 @@ public class 
SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
     return partitionSequenceNumberMap;
   }
 
+  /**
+   * Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for 
backwards compatibility, so a serialized
+   * SeekableStreamStartSequenceNumbers can be read by older Druid versions as 
a KafkaPartitions object.
+   */
+  @JsonProperty
+  public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
+  {
+    return partitionSequenceNumberMap;
+  }
+
   @Override
   public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> 
plus(
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
new file mode 100644
index 0000000..f4342e2
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SeekableStreamStartSequenceNumbersTest
+{
+  private static final ObjectMapper OBJECT_MAPPER = 
TestHelper.makeJsonMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    final String stream = "theStream";
+    final Map<Integer, Long> offsetMap = ImmutableMap.of(1, 2L, 3, 4L);
+
+    final SeekableStreamStartSequenceNumbers<Integer, Long> partitions = new 
SeekableStreamStartSequenceNumbers<>(
+        stream,
+        offsetMap,
+        ImmutableSet.of(6)
+    );
+    final String serializedString = 
OBJECT_MAPPER.writeValueAsString(partitions);
+
+    // Check round-trip.
+    final SeekableStreamStartSequenceNumbers<Integer, Long> partitions2 = 
OBJECT_MAPPER.readValue(
+        serializedString,
+        new TypeReference<SeekableStreamStartSequenceNumbers<Integer, Long>>() 
{}
+    );
+
+    Assert.assertEquals("Round trip", partitions, partitions2);
+
+    // Check backwards compatibility.
+    final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+        serializedString,
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    Assert.assertEquals(stream, asMap.get("stream"));
+    Assert.assertEquals(stream, asMap.get("topic"));
+
+    // Jackson will deserialize the maps as string -> int maps, not int -> 
long.
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), 
new TypeReference<Map<Integer, Long>>() {})
+    );
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new 
TypeReference<Map<Integer, Long>>() {})
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to