This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c591ff8  Add NonnullPair (#10013)
c591ff8 is described below

commit c591ff8ea8f4c2051c2c4721b2b6b82bea54d107
Author: Jihoon Son <[email protected]>
AuthorDate: Fri Jun 26 09:52:06 2020 -0700

    Add NonnullPair (#10013)
    
    * Add NonnullPair
    
    * new line
    
    * test
    
    * make it consistent
---
 .../apache/druid/java/util/common/NonnullPair.java | 65 ++++++++++++++++++++++
 .../druid/java/util/common/NonnullPairTest.java    | 55 ++++++++++++++++++
 .../druid/indexing/common/task/CompactionTask.java | 58 +++++++++----------
 3 files changed, 149 insertions(+), 29 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/NonnullPair.java 
b/core/src/main/java/org/apache/druid/java/util/common/NonnullPair.java
new file mode 100644
index 0000000..71c7e30
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/common/NonnullPair.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+public class NonnullPair<L, R>
+{
+  public final L lhs;
+  public final R rhs;
+
+  public NonnullPair(L lhs, R rhs)
+  {
+    this.lhs = Preconditions.checkNotNull(lhs, "lhs");
+    this.rhs = Preconditions.checkNotNull(rhs, "rhs");
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    NonnullPair<?, ?> that = (NonnullPair<?, ?>) o;
+    return Objects.equals(lhs, that.lhs) &&
+           Objects.equals(rhs, that.rhs);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(lhs, rhs);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "NonnullPair{" +
+           "lhs=" + lhs +
+           ", rhs=" + rhs +
+           '}';
+  }
+}
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/NonnullPairTest.java 
b/core/src/test/java/org/apache/druid/java/util/common/NonnullPairTest.java
new file mode 100644
index 0000000..354ec62
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/common/NonnullPairTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class NonnullPairTest
+{
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(NonnullPair.class).withNonnullFields("lhs", 
"rhs").usingGetClass().verify();
+  }
+
+  @Test
+  public void testConstructorWithNull()
+  {
+    expectedException.expect(NullPointerException.class);
+    expectedException.expectMessage("lhs");
+    //noinspection ResultOfObjectAllocationIgnored
+    new NonnullPair<>(null, "nullTest");
+  }
+
+  @Test
+  public void testGets()
+  {
+    final NonnullPair<Integer, Integer> pair = new NonnullPair<>(20, 30);
+    Assert.assertEquals(20, pair.lhs.intValue());
+    Assert.assertEquals(30, pair.rhs.intValue());
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 3ce3f31..5cfaf32 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -64,7 +64,7 @@ import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
-import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.NonnullPair;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -103,6 +103,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -494,7 +495,7 @@ public class CompactionTask extends AbstractBatchIndexTask
       final RetryPolicyFactory retryPolicyFactory
   ) throws IOException, SegmentLoadingException
   {
-    Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, 
DataSegment>>> pair = prepareSegments(
+    NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String, 
DataSegment>>> pair = prepareSegments(
         toolbox,
         segmentProvider,
         lockGranularityInUse
@@ -508,7 +509,7 @@ public class CompactionTask extends AbstractBatchIndexTask
 
     // find metadata for interval
     // queryableIndexAndSegments is sorted by the interval of the dataSegment
-    final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = 
loadSegments(
+    final List<NonnullPair<QueryableIndex, DataSegment>> 
queryableIndexAndSegments = loadSegments(
         timelineSegments,
         segmentFileMap,
         toolbox.getIndexIO()
@@ -518,20 +519,20 @@ public class CompactionTask extends AbstractBatchIndexTask
 
     if (segmentGranularity == null) {
       // original granularity
-      final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> 
intervalToSegments = new TreeMap<>(
+      final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> 
intervalToSegments = new TreeMap<>(
           Comparators.intervalsByStartThenEnd()
       );
-      //noinspection ConstantConditions
       queryableIndexAndSegments.forEach(
           p -> intervalToSegments.computeIfAbsent(p.rhs.getInterval(), k -> 
new ArrayList<>())
                                  .add(p)
       );
 
       // unify overlapping intervals to ensure overlapping segments compacting 
in the same indexSpec
-      List<Pair<Interval, List<Pair<QueryableIndex, DataSegment>>>> 
intervalToSegmentsUnified = new ArrayList<>();
+      List<NonnullPair<Interval, List<NonnullPair<QueryableIndex, 
DataSegment>>>> intervalToSegmentsUnified =
+          new ArrayList<>();
       Interval union = null;
-      List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
-      for (Map.Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry 
: intervalToSegments.entrySet()) {
+      List<NonnullPair<QueryableIndex, DataSegment>> segments = new 
ArrayList<>();
+      for (Entry<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> 
entry : intervalToSegments.entrySet()) {
         Interval cur = entry.getKey();
         if (union == null) {
           union = cur;
@@ -540,24 +541,23 @@ public class CompactionTask extends AbstractBatchIndexTask
           union = Intervals.utc(union.getStartMillis(), 
Math.max(union.getEndMillis(), cur.getEndMillis()));
           segments.addAll(entry.getValue());
         } else {
-          intervalToSegmentsUnified.add(Pair.of(union, segments));
+          intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
           union = cur;
           segments = new ArrayList<>(entry.getValue());
         }
       }
-      intervalToSegmentsUnified.add(Pair.of(union, segments));
+      intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
 
       final List<ParallelIndexIngestionSpec> specs = new 
ArrayList<>(intervalToSegmentsUnified.size());
-      for (Pair<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : 
intervalToSegmentsUnified) {
+      for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, 
DataSegment>>> entry : intervalToSegmentsUnified) {
         final Interval interval = entry.lhs;
-        final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = 
entry.rhs;
+        final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact 
= entry.rhs;
         final DataSchema dataSchema = createDataSchema(
             segmentProvider.dataSource,
             segmentsToCompact,
             dimensionsSpec,
             metricsSpec,
-            
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(),
-            jsonMapper
+            
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity()
         );
 
         specs.add(
@@ -584,8 +584,7 @@ public class CompactionTask extends AbstractBatchIndexTask
           queryableIndexAndSegments,
           dimensionsSpec,
           metricsSpec,
-          segmentGranularity,
-          jsonMapper
+          segmentGranularity
       );
 
       return Collections.singletonList(
@@ -633,7 +632,7 @@ public class CompactionTask extends AbstractBatchIndexTask
     );
   }
 
-  private static Pair<Map<DataSegment, File>, 
List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
+  private static NonnullPair<Map<DataSegment, File>, 
List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
       TaskToolbox toolbox,
       SegmentProvider segmentProvider,
       LockGranularity lockGranularityInUse
@@ -645,20 +644,19 @@ public class CompactionTask extends AbstractBatchIndexTask
     final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = 
VersionedIntervalTimeline
         .forSegments(usedSegments)
         .lookup(segmentProvider.interval);
-    return Pair.of(segmentFileMap, timelineSegments);
+    return new NonnullPair<>(segmentFileMap, timelineSegments);
   }
 
   private static DataSchema createDataSchema(
       String dataSource,
-      List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
+      List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
       @Nullable DimensionsSpec dimensionsSpec,
       @Nullable AggregatorFactory[] metricsSpec,
-      Granularity segmentGranularity,
-      ObjectMapper jsonMapper
+      Granularity segmentGranularity
   )
   {
     // check index metadata
-    for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
+    for (NonnullPair<QueryableIndex, DataSegment> pair : 
queryableIndexAndSegments) {
       final QueryableIndex index = pair.lhs;
       if (index.getMetadata() == null) {
         throw new RE("Index metadata doesn't exist for segment[%s]", 
pair.rhs.getId());
@@ -703,7 +701,7 @@ public class CompactionTask extends AbstractBatchIndexTask
   }
 
   private static AggregatorFactory[] createMetricsSpec(
-      List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
+      List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
   )
   {
     final List<AggregatorFactory[]> aggregatorFactories = 
queryableIndexAndSegments
@@ -725,7 +723,7 @@ public class CompactionTask extends AbstractBatchIndexTask
                  .toArray(AggregatorFactory[]::new);
   }
 
-  private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, 
DataSegment>> queryableIndices)
+  private static DimensionsSpec 
createDimensionsSpec(List<NonnullPair<QueryableIndex, DataSegment>> 
queryableIndices)
   {
     final BiMap<String, Integer> uniqueDims = HashBiMap.create();
     final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
@@ -736,10 +734,12 @@ public class CompactionTask extends AbstractBatchIndexTask
     // frequently, and thus the performance should be optimized for recent 
ones rather than old ones.
 
     // sort timelineSegments in order of interval, see 
https://github.com/apache/druid/pull/9905
-    queryableIndices.sort((o1, o2) -> 
Comparators.intervalsByStartThenEnd().compare(o1.rhs.getInterval(), 
o2.rhs.getInterval()));
+    queryableIndices.sort(
+        (o1, o2) -> 
Comparators.intervalsByStartThenEnd().compare(o1.rhs.getInterval(), 
o2.rhs.getInterval())
+    );
 
     int index = 0;
-    for (Pair<QueryableIndex, DataSegment> pair : 
Lists.reverse(queryableIndices)) {
+    for (NonnullPair<QueryableIndex, DataSegment> pair : 
Lists.reverse(queryableIndices)) {
       final QueryableIndex queryableIndex = pair.lhs;
       final Map<String, DimensionHandler> dimensionHandlerMap = 
queryableIndex.getDimensionHandlers();
 
@@ -786,13 +786,13 @@ public class CompactionTask extends AbstractBatchIndexTask
     return new DimensionsSpec(dimensionSchemas, null, null);
   }
 
-  private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
+  private static List<NonnullPair<QueryableIndex, DataSegment>> loadSegments(
       List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
       Map<DataSegment, File> segmentFileMap,
       IndexIO indexIO
   ) throws IOException
   {
-    final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
+    final List<NonnullPair<QueryableIndex, DataSegment>> segments = new 
ArrayList<>();
 
     for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : 
timelineObjectHolders) {
       final PartitionHolder<DataSegment> partitionHolder = 
timelineObjectHolder.getObject();
@@ -801,7 +801,7 @@ public class CompactionTask extends AbstractBatchIndexTask
         final QueryableIndex queryableIndex = indexIO.loadIndex(
             Preconditions.checkNotNull(segmentFileMap.get(segment), "File for 
segment %s", segment.getId())
         );
-        segments.add(Pair.of(queryableIndex, segment));
+        segments.add(new NonnullPair<>(queryableIndex, segment));
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to