This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new eccdec9139 Reduce interval creation cost for segment cost computation 
(#12670)
eccdec9139 is described below

commit eccdec9139c6946e26dbebe9be35f7c9f9cb521c
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Jun 21 17:39:43 2022 +0530

    Reduce interval creation cost for segment cost computation (#12670)
    
    Changes:
    - Reuse created interval in `SegmentId.getInterval()`
    - Intern intervals to save on memory footprint
---
 .../java/org/apache/druid/timeline/SegmentId.java  | 40 +++++++++-------------
 .../StreamAppenderatorDriverFailTest.java          |  4 +--
 .../coordination/SegmentLoadDropHandlerTest.java   | 10 ++++--
 3 files changed, 26 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentId.java 
b/core/src/main/java/org/apache/druid/timeline/SegmentId.java
index 8430524021..986d9aafe2 100644
--- a/core/src/main/java/org/apache/druid/timeline/SegmentId.java
+++ b/core/src/main/java/org/apache/druid/timeline/SegmentId.java
@@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.timeline.partition.ShardSpec;
-import org.joda.time.Chronology;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -80,6 +79,12 @@ public final class SegmentId implements Comparable<SegmentId>
    */
   private static final Interner<String> STRING_INTERNER = 
Interners.newWeakInterner();
 
+  /**
+   * Store Intervals since creating them each time before returning is an 
expensive operation
+   * To decrease the memory required for storing intervals, intern them, since 
the number of distinct values is "low"
+   */
+  private static final Interner<Interval> INTERVAL_INTERNER = 
Interners.newWeakInterner();
+
   private static final char DELIMITER = '_';
   private static final Splitter DELIMITER_SPLITTER = Splitter.on(DELIMITER);
   private static final Joiner DELIMITER_JOINER = Joiner.on(DELIMITER);
@@ -258,14 +263,7 @@ public final class SegmentId implements 
Comparable<SegmentId>
   }
 
   private final String dataSource;
-  /**
-   * {@code intervalStartMillis}, {@link #intervalEndMillis} and {@link 
#intervalChronology} are the three fields of
-   * an {@link Interval}. Storing them directly to flatten the structure and 
reduce the heap space consumption.
-   */
-  private final long intervalStartMillis;
-  private final long intervalEndMillis;
-  @Nullable
-  private final Chronology intervalChronology;
+  private final Interval interval;
   private final String version;
   private final int partitionNum;
 
@@ -278,9 +276,7 @@ public final class SegmentId implements 
Comparable<SegmentId>
   private SegmentId(String dataSource, Interval interval, String version, int 
partitionNum)
   {
     this.dataSource = 
STRING_INTERNER.intern(Objects.requireNonNull(dataSource));
-    this.intervalStartMillis = interval.getStartMillis();
-    this.intervalEndMillis = interval.getEndMillis();
-    this.intervalChronology = interval.getChronology();
+    this.interval = INTERVAL_INTERNER.intern(Objects.requireNonNull(interval));
     // Versions are timestamp-based Strings, interning of them doesn't make 
sense. If this is not the case, interning
     // could be conditionally allowed via a system property.
     this.version = Objects.requireNonNull(version);
@@ -297,9 +293,7 @@ public final class SegmentId implements 
Comparable<SegmentId>
     hashCode = hashCode * 1000003 + version.hashCode();
 
     hashCode = hashCode * 1000003 + dataSource.hashCode();
-    hashCode = hashCode * 1000003 + Long.hashCode(intervalStartMillis);
-    hashCode = hashCode * 1000003 + Long.hashCode(intervalEndMillis);
-    hashCode = hashCode * 1000003 + Objects.hashCode(intervalChronology);
+    hashCode = hashCode * 1000003 + interval.hashCode();
     return hashCode;
   }
 
@@ -310,17 +304,17 @@ public final class SegmentId implements 
Comparable<SegmentId>
 
   public DateTime getIntervalStart()
   {
-    return new DateTime(intervalStartMillis, intervalChronology);
+    return new DateTime(interval.getStartMillis(), interval.getChronology());
   }
 
   public DateTime getIntervalEnd()
   {
-    return new DateTime(intervalEndMillis, intervalChronology);
+    return new DateTime(interval.getEndMillis(), interval.getChronology());
   }
 
   public Interval getInterval()
   {
-    return new Interval(intervalStartMillis, intervalEndMillis, 
intervalChronology);
+    return interval;
   }
 
   public String getVersion()
@@ -340,7 +334,7 @@ public final class SegmentId implements 
Comparable<SegmentId>
 
   public SegmentDescriptor toDescriptor()
   {
-    return new SegmentDescriptor(Intervals.utc(intervalStartMillis, 
intervalEndMillis), version, partitionNum);
+    return new SegmentDescriptor(Intervals.utc(interval.getStartMillis(), 
interval.getEndMillis()), version, partitionNum);
   }
 
   @Override
@@ -357,9 +351,7 @@ public final class SegmentId implements 
Comparable<SegmentId>
     // are equal as well as all other fields used to compute them, the 
partitionNums are also guaranteed to be equal.
     return hashCode == that.hashCode &&
            dataSource.equals(that.dataSource) &&
-           intervalStartMillis == that.intervalStartMillis &&
-           intervalEndMillis == that.intervalEndMillis &&
-           Objects.equals(intervalChronology, that.intervalChronology) &&
+           interval.equals(that.interval) &&
            version.equals(that.version);
   }
 
@@ -376,11 +368,11 @@ public final class SegmentId implements 
Comparable<SegmentId>
     if (result != 0) {
       return result;
     }
-    result = Long.compare(intervalStartMillis, o.intervalStartMillis);
+    result = Long.compare(interval.getStartMillis(), 
o.interval.getStartMillis());
     if (result != 0) {
       return result;
     }
-    result = Long.compare(intervalEndMillis, o.intervalEndMillis);
+    result = Long.compare(interval.getEndMillis(), o.interval.getEndMillis());
     if (result != 0) {
       return result;
     }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 4f9cd3c341..1e2aa7d73e 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -60,10 +60,10 @@ import org.junit.rules.ExpectedException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -355,7 +355,7 @@ public class StreamAppenderatorDriverFailTest extends 
EasyMockSupport
 
   private static class FailableAppenderator implements Appenderator
   {
-    private final Map<SegmentIdWithShardSpec, List<InputRow>> rows = new 
HashMap<>();
+    private final Map<SegmentIdWithShardSpec, List<InputRow>> rows = new 
TreeMap<>();
 
     private boolean dropEnabled = true;
     private boolean persistEnabled = true;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 492b788c54..31f39c0f28 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -57,8 +57,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledExecutorService;
@@ -501,9 +503,13 @@ public class SegmentLoadDropHandlerTest
     
ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>
 future = segmentLoadDropHandler
         .processBatch(batch);
 
+    Map<DataSegmentChangeRequest, SegmentLoadDropHandler.Status> 
expectedStatusMap = new HashMap<>();
+    expectedStatusMap.put(batch.get(0), SegmentLoadDropHandler.Status.PENDING);
+    expectedStatusMap.put(batch.get(1), SegmentLoadDropHandler.Status.SUCCESS);
     List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result = 
future.get();
-    Assert.assertEquals(SegmentLoadDropHandler.Status.PENDING, 
result.get(0).getStatus());
-    Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, 
result.get(1).getStatus());
+    for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus 
requestAndStatus : result) {
+      
Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), 
requestAndStatus.getStatus());
+    }
 
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to