This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new eccdec9139 Reduce interval creation cost for segment cost computation
(#12670)
eccdec9139 is described below
commit eccdec9139c6946e26dbebe9be35f7c9f9cb521c
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Jun 21 17:39:43 2022 +0530
Reduce interval creation cost for segment cost computation (#12670)
Changes:
- Reuse created interval in `SegmentId.getInterval()`
- Intern intervals to save on memory footprint
---
.../java/org/apache/druid/timeline/SegmentId.java | 40 +++++++++-------------
.../StreamAppenderatorDriverFailTest.java | 4 +--
.../coordination/SegmentLoadDropHandlerTest.java | 10 ++++--
3 files changed, 26 insertions(+), 28 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentId.java
b/core/src/main/java/org/apache/druid/timeline/SegmentId.java
index 8430524021..986d9aafe2 100644
--- a/core/src/main/java/org/apache/druid/timeline/SegmentId.java
+++ b/core/src/main/java/org/apache/druid/timeline/SegmentId.java
@@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.timeline.partition.ShardSpec;
-import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -80,6 +79,12 @@ public final class SegmentId implements Comparable<SegmentId>
*/
private static final Interner<String> STRING_INTERNER =
Interners.newWeakInterner();
+ /**
+ * Store Intervals since creating them each time before returning is an
expensive operation
+ * To decrease the memory required for storing intervals, intern them, since
the number of distinct values is "low"
+ */
+ private static final Interner<Interval> INTERVAL_INTERNER =
Interners.newWeakInterner();
+
private static final char DELIMITER = '_';
private static final Splitter DELIMITER_SPLITTER = Splitter.on(DELIMITER);
private static final Joiner DELIMITER_JOINER = Joiner.on(DELIMITER);
@@ -258,14 +263,7 @@ public final class SegmentId implements
Comparable<SegmentId>
}
private final String dataSource;
- /**
- * {@code intervalStartMillis}, {@link #intervalEndMillis} and {@link
#intervalChronology} are the three fields of
- * an {@link Interval}. Storing them directly to flatten the structure and
reduce the heap space consumption.
- */
- private final long intervalStartMillis;
- private final long intervalEndMillis;
- @Nullable
- private final Chronology intervalChronology;
+ private final Interval interval;
private final String version;
private final int partitionNum;
@@ -278,9 +276,7 @@ public final class SegmentId implements
Comparable<SegmentId>
private SegmentId(String dataSource, Interval interval, String version, int
partitionNum)
{
this.dataSource =
STRING_INTERNER.intern(Objects.requireNonNull(dataSource));
- this.intervalStartMillis = interval.getStartMillis();
- this.intervalEndMillis = interval.getEndMillis();
- this.intervalChronology = interval.getChronology();
+ this.interval = INTERVAL_INTERNER.intern(Objects.requireNonNull(interval));
// Versions are timestamp-based Strings, interning of them doesn't make
sense. If this is not the case, interning
// could be conditionally allowed via a system property.
this.version = Objects.requireNonNull(version);
@@ -297,9 +293,7 @@ public final class SegmentId implements
Comparable<SegmentId>
hashCode = hashCode * 1000003 + version.hashCode();
hashCode = hashCode * 1000003 + dataSource.hashCode();
- hashCode = hashCode * 1000003 + Long.hashCode(intervalStartMillis);
- hashCode = hashCode * 1000003 + Long.hashCode(intervalEndMillis);
- hashCode = hashCode * 1000003 + Objects.hashCode(intervalChronology);
+ hashCode = hashCode * 1000003 + interval.hashCode();
return hashCode;
}
@@ -310,17 +304,17 @@ public final class SegmentId implements
Comparable<SegmentId>
public DateTime getIntervalStart()
{
- return new DateTime(intervalStartMillis, intervalChronology);
+ return new DateTime(interval.getStartMillis(), interval.getChronology());
}
public DateTime getIntervalEnd()
{
- return new DateTime(intervalEndMillis, intervalChronology);
+ return new DateTime(interval.getEndMillis(), interval.getChronology());
}
public Interval getInterval()
{
- return new Interval(intervalStartMillis, intervalEndMillis,
intervalChronology);
+ return interval;
}
public String getVersion()
@@ -340,7 +334,7 @@ public final class SegmentId implements
Comparable<SegmentId>
public SegmentDescriptor toDescriptor()
{
- return new SegmentDescriptor(Intervals.utc(intervalStartMillis,
intervalEndMillis), version, partitionNum);
+ return new SegmentDescriptor(Intervals.utc(interval.getStartMillis(),
interval.getEndMillis()), version, partitionNum);
}
@Override
@@ -357,9 +351,7 @@ public final class SegmentId implements
Comparable<SegmentId>
// are equal as well as all other fields used to compute them, the
partitionNums are also guaranteed to be equal.
return hashCode == that.hashCode &&
dataSource.equals(that.dataSource) &&
- intervalStartMillis == that.intervalStartMillis &&
- intervalEndMillis == that.intervalEndMillis &&
- Objects.equals(intervalChronology, that.intervalChronology) &&
+ interval.equals(that.interval) &&
version.equals(that.version);
}
@@ -376,11 +368,11 @@ public final class SegmentId implements
Comparable<SegmentId>
if (result != 0) {
return result;
}
- result = Long.compare(intervalStartMillis, o.intervalStartMillis);
+ result = Long.compare(interval.getStartMillis(),
o.interval.getStartMillis());
if (result != 0) {
return result;
}
- result = Long.compare(intervalEndMillis, o.intervalEndMillis);
+ result = Long.compare(interval.getEndMillis(), o.interval.getEndMillis());
if (result != 0) {
return result;
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 4f9cd3c341..1e2aa7d73e 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -60,10 +60,10 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -355,7 +355,7 @@ public class StreamAppenderatorDriverFailTest extends
EasyMockSupport
private static class FailableAppenderator implements Appenderator
{
- private final Map<SegmentIdWithShardSpec, List<InputRow>> rows = new
HashMap<>();
+ private final Map<SegmentIdWithShardSpec, List<InputRow>> rows = new
TreeMap<>();
private boolean dropEnabled = true;
private boolean persistEnabled = true;
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 492b788c54..31f39c0f28 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -57,8 +57,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
@@ -501,9 +503,13 @@ public class SegmentLoadDropHandlerTest
ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>
future = segmentLoadDropHandler
.processBatch(batch);
+ Map<DataSegmentChangeRequest, SegmentLoadDropHandler.Status>
expectedStatusMap = new HashMap<>();
+ expectedStatusMap.put(batch.get(0), SegmentLoadDropHandler.Status.PENDING);
+ expectedStatusMap.put(batch.get(1), SegmentLoadDropHandler.Status.SUCCESS);
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result =
future.get();
- Assert.assertEquals(SegmentLoadDropHandler.Status.PENDING,
result.get(0).getStatus());
- Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS,
result.get(1).getStatus());
+ for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus
requestAndStatus : result) {
+
Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()),
requestAndStatus.getStatus());
+ }
for (Runnable runnable : scheduledRunnable) {
runnable.run();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]