jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891571
########## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ########## @@ -21,80 +21,32 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import io.druid.indexer.TaskLocation; -import io.druid.indexer.TaskStatus; -import io.druid.indexing.common.RetryPolicy; -import io.druid.indexing.common.RetryPolicyConfig; -import io.druid.indexing.common.RetryPolicyFactory; +import io.druid.indexing.common.IndexTaskClient; import io.druid.indexing.common.TaskInfoProvider; -import io.druid.java.util.common.IAE; -import io.druid.java.util.common.IOE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.jackson.JacksonUtils; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.http.client.HttpClient; -import io.druid.java.util.http.client.Request; -import io.druid.java.util.http.client.response.FullResponseHandler; import io.druid.java.util.http.client.response.FullResponseHolder; -import io.druid.segment.realtime.firehose.ChatHandlerResource; -import org.jboss.netty.channel.ChannelException; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; import org.joda.time.Duration; -import org.joda.time.Period; -import javax.ws.rs.core.MediaType; +import javax.annotation.Nullable; import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.Callable; -public class KafkaIndexTaskClient +public class KafkaIndexTaskClient extends IndexTaskClient { - public static class NoTaskLocationException extends RuntimeException - { - public NoTaskLocationException(String message) - { - super(message); - } - } - - public static class TaskNotRunnableException extends RuntimeException - { - public TaskNotRunnableException(String message) - { - super(message); - } - } - - public static final int MAX_RETRY_WAIT_SECONDS = 10; - - private static final int MIN_RETRY_WAIT_SECONDS = 2; private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class); - private static final String BASE_PATH = "/druid/worker/v1/chat"; - private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; - private static final TreeMap EMPTY_TREE_MAP = new TreeMap(); - - private final HttpClient httpClient; - private final ObjectMapper jsonMapper; - private final TaskInfoProvider taskInfoProvider; - private final Duration httpTimeout; - private final RetryPolicyFactory retryPolicyFactory; - private final ListeningExecutorService executorService; - private final long numRetries; + private static final TreeMap<Integer, Map<Integer, Long>> EMPTY_TREE_MAP = new TreeMap<>(); Review comment: Well, we can wrap this with `Collections.unmodifiableSortedMap()`. However, this requires to change all types of `checkpointOffset`s in KafkaSupervisor which is currently `TreeMap`. I don't think it's trivial change. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org