cryptoe commented on code in PR #12696:
URL: https://github.com/apache/druid/pull/12696#discussion_r912896599


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskClient.java:
##########
@@ -19,122 +19,28 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.indexing.common.IndexTaskClient;
-import org.apache.druid.indexing.common.TaskInfoProvider;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.http.client.HttpClient;
-import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.DateTime;
-import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
-public class ParallelIndexSupervisorTaskClient extends IndexTaskClient
+public interface ParallelIndexSupervisorTaskClient
 {
-  ParallelIndexSupervisorTaskClient(
-      HttpClient httpClient,
-      ObjectMapper objectMapper,
-      TaskInfoProvider taskInfoProvider,
-      Duration httpTimeout,
-      String callerId,
-      long numRetries
-  )
-  {
-    super(httpClient, objectMapper, taskInfoProvider, httpTimeout, callerId, 
1, numRetries);
-  }
-
   /**
    * See {@link SinglePhaseParallelIndexTaskRunner#allocateNewSegment(String, 
DateTime)}.
    */
   @Deprecated
-  public SegmentIdWithShardSpec allocateSegment(String supervisorTaskId, 
DateTime timestamp) throws IOException
-  {
-    final StringFullResponseHolder response = submitSmileRequest(
-        supervisorTaskId,
-        HttpMethod.POST,
-        "segment/allocate",
-        null,
-        serialize(timestamp),
-        true
-    );
-    if (!isSuccess(response)) {
-      throw new ISE(
-          "task[%s] failed to allocate a new segment identifier with the HTTP 
code[%d] and content[%s]",
-          supervisorTaskId,
-          response.getStatus().getCode(),
-          response.getContent()
-      );
-    } else {
-      return deserialize(
-          response.getContent(),
-          new TypeReference<SegmentIdWithShardSpec>()
-          {
-          }
-      );
-    }
-  }
+  SegmentIdWithShardSpec allocateSegment(DateTime timestamp) throws 
IOException;
 
   /**
    * See {@link SinglePhaseParallelIndexTaskRunner#allocateNewSegment(String, 
DateTime, String, String)}.
    */
-  public SegmentIdWithShardSpec allocateSegment(
-      String supervisorTaskId,
+  SegmentIdWithShardSpec allocateSegment(
       DateTime timestamp,
       String sequenceName,
       @Nullable String prevSegmentId
-  ) throws IOException
-  {
-    final StringFullResponseHolder response = submitSmileRequest(
-        supervisorTaskId,
-        HttpMethod.POST,
-        "segment/allocate",
-        null,
-        serialize(new SegmentAllocationRequest(timestamp, sequenceName, 
prevSegmentId)),
-        true
-    );
-    if (!isSuccess(response)) {
-      throw new ISE(
-          "task[%s] failed to allocate a new segment identifier with the HTTP 
code[%d] and content[%s]",
-          supervisorTaskId,
-          response.getStatus().getCode(),
-          response.getContent()
-      );
-    } else {
-      return deserialize(
-          response.getContent(),
-          new TypeReference<SegmentIdWithShardSpec>()
-          {
-          }
-      );
-    }
-  }
+  ) throws IOException;
 
-  public void report(String supervisorTaskId, SubTaskReport report)
-  {
-    try {
-      final StringFullResponseHolder response = submitSmileRequest(
-          supervisorTaskId,
-          HttpMethod.POST,
-          "report",
-          null,
-          serialize(report),
-          true
-      );
-      if (!isSuccess(response)) {
-        throw new ISE(
-            "Failed to send taskReports to task[%s] with the HTTP code [%d]",
-            supervisorTaskId,
-            response.getStatus().getCode()
-        );
-      }
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
+  void report(SubTaskReport report);

Review Comment:
   Super Nit: as this is a interface, IMHO we should add documentation to this 
method 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskClient.java:
##########
@@ -19,122 +19,28 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.indexing.common.IndexTaskClient;
-import org.apache.druid.indexing.common.TaskInfoProvider;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.http.client.HttpClient;
-import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.DateTime;
-import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
-public class ParallelIndexSupervisorTaskClient extends IndexTaskClient
+public interface ParallelIndexSupervisorTaskClient
 {
-  ParallelIndexSupervisorTaskClient(
-      HttpClient httpClient,
-      ObjectMapper objectMapper,
-      TaskInfoProvider taskInfoProvider,
-      Duration httpTimeout,
-      String callerId,
-      long numRetries
-  )
-  {
-    super(httpClient, objectMapper, taskInfoProvider, httpTimeout, callerId, 
1, numRetries);
-  }
-
   /**
    * See {@link SinglePhaseParallelIndexTaskRunner#allocateNewSegment(String, 
DateTime)}.
    */
   @Deprecated
-  public SegmentIdWithShardSpec allocateSegment(String supervisorTaskId, 
DateTime timestamp) throws IOException
-  {
-    final StringFullResponseHolder response = submitSmileRequest(
-        supervisorTaskId,
-        HttpMethod.POST,
-        "segment/allocate",
-        null,
-        serialize(timestamp),
-        true
-    );
-    if (!isSuccess(response)) {
-      throw new ISE(
-          "task[%s] failed to allocate a new segment identifier with the HTTP 
code[%d] and content[%s]",
-          supervisorTaskId,
-          response.getStatus().getCode(),
-          response.getContent()
-      );
-    } else {
-      return deserialize(
-          response.getContent(),
-          new TypeReference<SegmentIdWithShardSpec>()
-          {
-          }
-      );
-    }
-  }
+  SegmentIdWithShardSpec allocateSegment(DateTime timestamp) throws 
IOException;
 
   /**
    * See {@link SinglePhaseParallelIndexTaskRunner#allocateNewSegment(String, 
DateTime, String, String)}.
    */
-  public SegmentIdWithShardSpec allocateSegment(
-      String supervisorTaskId,
+  SegmentIdWithShardSpec allocateSegment(
       DateTime timestamp,
       String sequenceName,
       @Nullable String prevSegmentId
-  ) throws IOException
-  {
-    final StringFullResponseHolder response = submitSmileRequest(
-        supervisorTaskId,
-        HttpMethod.POST,
-        "segment/allocate",
-        null,
-        serialize(new SegmentAllocationRequest(timestamp, sequenceName, 
prevSegmentId)),
-        true
-    );
-    if (!isSuccess(response)) {
-      throw new ISE(
-          "task[%s] failed to allocate a new segment identifier with the HTTP 
code[%d] and content[%s]",
-          supervisorTaskId,
-          response.getStatus().getCode(),
-          response.getContent()
-      );
-    } else {
-      return deserialize(
-          response.getContent(),
-          new TypeReference<SegmentIdWithShardSpec>()
-          {
-          }
-      );
-    }
-  }
+  ) throws IOException;
 
-  public void report(String supervisorTaskId, SubTaskReport report)
-  {
-    try {
-      final StringFullResponseHolder response = submitSmileRequest(
-          supervisorTaskId,
-          HttpMethod.POST,
-          "report",
-          null,
-          serialize(report),
-          true
-      );
-      if (!isSuccess(response)) {
-        throw new ISE(
-            "Failed to send taskReports to task[%s] with the HTTP code [%d]",
-            supervisorTaskId,
-            response.getStatus().getCode()
-        );
-      }
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
+  void report(SubTaskReport report);

Review Comment:
   Super Nit: as this is an interface, IMHO we should add documentation to this 
method 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to