This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c591ff8 Add NonnullPair (#10013)
c591ff8 is described below
commit c591ff8ea8f4c2051c2c4721b2b6b82bea54d107
Author: Jihoon Son <[email protected]>
AuthorDate: Fri Jun 26 09:52:06 2020 -0700
Add NonnullPair (#10013)
* Add NonnullPair
* new line
* test
* make it consistent
---
.../apache/druid/java/util/common/NonnullPair.java | 65 ++++++++++++++++++++++
.../druid/java/util/common/NonnullPairTest.java | 55 ++++++++++++++++++
.../druid/indexing/common/task/CompactionTask.java | 58 +++++++++----------
3 files changed, 149 insertions(+), 29 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/NonnullPair.java
b/core/src/main/java/org/apache/druid/java/util/common/NonnullPair.java
new file mode 100644
index 0000000..71c7e30
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/common/NonnullPair.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+public class NonnullPair<L, R>
+{
+ public final L lhs;
+ public final R rhs;
+
+ public NonnullPair(L lhs, R rhs)
+ {
+ this.lhs = Preconditions.checkNotNull(lhs, "lhs");
+ this.rhs = Preconditions.checkNotNull(rhs, "rhs");
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ NonnullPair<?, ?> that = (NonnullPair<?, ?>) o;
+ return Objects.equals(lhs, that.lhs) &&
+ Objects.equals(rhs, that.rhs);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(lhs, rhs);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NonnullPair{" +
+ "lhs=" + lhs +
+ ", rhs=" + rhs +
+ '}';
+ }
+}
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/NonnullPairTest.java
b/core/src/test/java/org/apache/druid/java/util/common/NonnullPairTest.java
new file mode 100644
index 0000000..354ec62
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/common/NonnullPairTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class NonnullPairTest
+{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(NonnullPair.class).withNonnullFields("lhs",
"rhs").usingGetClass().verify();
+ }
+
+ @Test
+ public void testConstructorWithNull()
+ {
+ expectedException.expect(NullPointerException.class);
+ expectedException.expectMessage("lhs");
+ //noinspection ResultOfObjectAllocationIgnored
+ new NonnullPair<>(null, "nullTest");
+ }
+
+ @Test
+ public void testGets()
+ {
+ final NonnullPair<Integer, Integer> pair = new NonnullPair<>(20, 30);
+ Assert.assertEquals(20, pair.lhs.intValue());
+ Assert.assertEquals(30, pair.rhs.intValue());
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 3ce3f31..5cfaf32 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -64,7 +64,7 @@ import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
-import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -103,6 +103,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -494,7 +495,7 @@ public class CompactionTask extends AbstractBatchIndexTask
final RetryPolicyFactory retryPolicyFactory
) throws IOException, SegmentLoadingException
{
- Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String,
DataSegment>>> pair = prepareSegments(
+ NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String,
DataSegment>>> pair = prepareSegments(
toolbox,
segmentProvider,
lockGranularityInUse
@@ -508,7 +509,7 @@ public class CompactionTask extends AbstractBatchIndexTask
// find metadata for interval
// queryableIndexAndSegments is sorted by the interval of the dataSegment
- final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments =
loadSegments(
+ final List<NonnullPair<QueryableIndex, DataSegment>>
queryableIndexAndSegments = loadSegments(
timelineSegments,
segmentFileMap,
toolbox.getIndexIO()
@@ -518,20 +519,20 @@ public class CompactionTask extends AbstractBatchIndexTask
if (segmentGranularity == null) {
// original granularity
- final Map<Interval, List<Pair<QueryableIndex, DataSegment>>>
intervalToSegments = new TreeMap<>(
+ final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>>
intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
- //noinspection ConstantConditions
queryableIndexAndSegments.forEach(
p -> intervalToSegments.computeIfAbsent(p.rhs.getInterval(), k ->
new ArrayList<>())
.add(p)
);
// unify overlapping intervals to ensure overlapping segments compacting
in the same indexSpec
- List<Pair<Interval, List<Pair<QueryableIndex, DataSegment>>>>
intervalToSegmentsUnified = new ArrayList<>();
+ List<NonnullPair<Interval, List<NonnullPair<QueryableIndex,
DataSegment>>>> intervalToSegmentsUnified =
+ new ArrayList<>();
Interval union = null;
- List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
- for (Map.Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry
: intervalToSegments.entrySet()) {
+ List<NonnullPair<QueryableIndex, DataSegment>> segments = new
ArrayList<>();
+ for (Entry<Interval, List<NonnullPair<QueryableIndex, DataSegment>>>
entry : intervalToSegments.entrySet()) {
Interval cur = entry.getKey();
if (union == null) {
union = cur;
@@ -540,24 +541,23 @@ public class CompactionTask extends AbstractBatchIndexTask
union = Intervals.utc(union.getStartMillis(),
Math.max(union.getEndMillis(), cur.getEndMillis()));
segments.addAll(entry.getValue());
} else {
- intervalToSegmentsUnified.add(Pair.of(union, segments));
+ intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
union = cur;
segments = new ArrayList<>(entry.getValue());
}
}
- intervalToSegmentsUnified.add(Pair.of(union, segments));
+ intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
final List<ParallelIndexIngestionSpec> specs = new
ArrayList<>(intervalToSegmentsUnified.size());
- for (Pair<Interval, List<Pair<QueryableIndex, DataSegment>>> entry :
intervalToSegmentsUnified) {
+ for (NonnullPair<Interval, List<NonnullPair<QueryableIndex,
DataSegment>>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs;
- final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact =
entry.rhs;
+ final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact
= entry.rhs;
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentsToCompact,
dimensionsSpec,
metricsSpec,
-
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(),
- jsonMapper
+
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity()
);
specs.add(
@@ -584,8 +584,7 @@ public class CompactionTask extends AbstractBatchIndexTask
queryableIndexAndSegments,
dimensionsSpec,
metricsSpec,
- segmentGranularity,
- jsonMapper
+ segmentGranularity
);
return Collections.singletonList(
@@ -633,7 +632,7 @@ public class CompactionTask extends AbstractBatchIndexTask
);
}
- private static Pair<Map<DataSegment, File>,
List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
+ private static NonnullPair<Map<DataSegment, File>,
List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
LockGranularity lockGranularityInUse
@@ -645,20 +644,19 @@ public class CompactionTask extends AbstractBatchIndexTask
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments =
VersionedIntervalTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
- return Pair.of(segmentFileMap, timelineSegments);
+ return new NonnullPair<>(segmentFileMap, timelineSegments);
}
private static DataSchema createDataSchema(
String dataSource,
- List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
+ List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable AggregatorFactory[] metricsSpec,
- Granularity segmentGranularity,
- ObjectMapper jsonMapper
+ Granularity segmentGranularity
)
{
// check index metadata
- for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
+ for (NonnullPair<QueryableIndex, DataSegment> pair :
queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]",
pair.rhs.getId());
@@ -703,7 +701,7 @@ public class CompactionTask extends AbstractBatchIndexTask
}
private static AggregatorFactory[] createMetricsSpec(
- List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
+ List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final List<AggregatorFactory[]> aggregatorFactories =
queryableIndexAndSegments
@@ -725,7 +723,7 @@ public class CompactionTask extends AbstractBatchIndexTask
.toArray(AggregatorFactory[]::new);
}
- private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex,
DataSegment>> queryableIndices)
+ private static DimensionsSpec
createDimensionsSpec(List<NonnullPair<QueryableIndex, DataSegment>>
queryableIndices)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();
final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
@@ -736,10 +734,12 @@ public class CompactionTask extends AbstractBatchIndexTask
// frequently, and thus the performance should be optimized for recent
ones rather than old ones.
// sort timelineSegments in order of interval, see
https://github.com/apache/druid/pull/9905
- queryableIndices.sort((o1, o2) ->
Comparators.intervalsByStartThenEnd().compare(o1.rhs.getInterval(),
o2.rhs.getInterval()));
+ queryableIndices.sort(
+ (o1, o2) ->
Comparators.intervalsByStartThenEnd().compare(o1.rhs.getInterval(),
o2.rhs.getInterval())
+ );
int index = 0;
- for (Pair<QueryableIndex, DataSegment> pair :
Lists.reverse(queryableIndices)) {
+ for (NonnullPair<QueryableIndex, DataSegment> pair :
Lists.reverse(queryableIndices)) {
final QueryableIndex queryableIndex = pair.lhs;
final Map<String, DimensionHandler> dimensionHandlerMap =
queryableIndex.getDimensionHandlers();
@@ -786,13 +786,13 @@ public class CompactionTask extends AbstractBatchIndexTask
return new DimensionsSpec(dimensionSchemas, null, null);
}
- private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
+ private static List<NonnullPair<QueryableIndex, DataSegment>> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
- final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
+ final List<NonnullPair<QueryableIndex, DataSegment>> segments = new
ArrayList<>();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder :
timelineObjectHolders) {
final PartitionHolder<DataSegment> partitionHolder =
timelineObjectHolder.getObject();
@@ -801,7 +801,7 @@ public class CompactionTask extends AbstractBatchIndexTask
final QueryableIndex queryableIndex = indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for
segment %s", segment.getId())
);
- segments.add(Pair.of(queryableIndex, segment));
+ segments.add(new NonnullPair<>(queryableIndex, segment));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]