This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 0.14.1-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit ffcc3df0a07ee8b313b2081bec8cb10647b92eb6 Author: Jihoon Son <[email protected]> AuthorDate: Sat Apr 20 18:08:34 2019 -0700 Fix encoded taskId check in chatHandlerResource (#7520) * Fix encoded taskId check in chatHandlerResource * fix tests --- .../indexing/kafka/KafkaIndexTaskClientTest.java | 7 ++-- .../kinesis/KinesisIndexTaskClientTest.java | 7 ++-- .../druid/indexing/common/IndexTaskClient.java | 2 +- .../parallel/ParallelIndexSupervisorTask.java | 6 ++-- .../realtime/firehose/ChatHandlerResource.java | 20 ++++++++--- .../initialization/jetty/BadRequestException.java | 28 +++++++++++++++ .../jetty/BadRequestExceptionMapper.java | 41 ++++++++++++++++++++++ .../initialization/jetty/JettyServerModule.java | 1 + 8 files changed, 97 insertions(+), 15 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java index 28cc0bb..f0178b8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -184,9 +184,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport public void testInternalServerError() { expectedException.expect(RuntimeException.class); - expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500]"); + expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []"); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2); + expect(responseHolder.getContent()).andReturn(""); expect( httpClient.go( EasyMock.anyObject(Request.class), @@ -231,7 +232,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) .andReturn(HttpResponseStatus.OK); expect(responseHolder.getResponse()).andReturn(response); - expect(responseHolder.getContent()).andReturn("") + expect(responseHolder.getContent()).andReturn("").times(2) .andReturn("{}"); expect(response.headers()).andReturn(headers); expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id"); @@ -291,7 +292,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport Capture<Request> captured = Capture.newInstance(CaptureType.ALL); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6) .andReturn(HttpResponseStatus.OK).times(1); - expect(responseHolder.getContent()).andReturn("").times(2) + expect(responseHolder.getContent()).andReturn("").times(4) .andReturn("{\"0\":1, \"1\":10}"); expect(responseHolder.getResponse()).andReturn(response).times(2); expect(response.headers()).andReturn(headers).times(2); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java index 9d7fafb..9ac25f5 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java @@ -185,9 +185,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport public void testInternalServerError() { expectedException.expect(RuntimeException.class); - expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500]"); + expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []"); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2); + expect(responseHolder.getContent()).andReturn(""); expect( httpClient.go( EasyMock.anyObject(Request.class), @@ -232,7 +233,7 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) .andReturn(HttpResponseStatus.OK); expect(responseHolder.getResponse()).andReturn(response); - expect(responseHolder.getContent()).andReturn("") + expect(responseHolder.getContent()).andReturn("").times(2) .andReturn("{}"); expect(response.headers()).andReturn(headers); expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id"); @@ -292,7 +293,7 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport Capture<Request> captured = Capture.newInstance(CaptureType.ALL); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6) .andReturn(HttpResponseStatus.OK).times(1); - expect(responseHolder.getContent()).andReturn("").times(2) + expect(responseHolder.getContent()).andReturn("").times(4) .andReturn("{\"0\":1, \"1\":10}"); expect(responseHolder.getResponse()).andReturn(response).times(2); expect(response.headers()).andReturn(headers).times(2); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java index 483795f..ad8bb39 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java @@ -355,7 +355,7 @@ public abstract class IndexTaskClient implements AutoCloseable } else if (responseCode == 400) { // don't bother retrying if it's a bad request throw new IAE("Received 400 Bad Request with body: %s", response.getContent()); } else { - throw new IOE("Received status [%d]", responseCode); + throw new IOE("Received status [%d] and content [%s]", responseCode, response.getContent()); } } catch (IOException | ChannelException e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index f8eebd4..210f46f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -348,10 +348,8 @@ public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHan @POST @Path("/segment/allocate") @Produces(SmileMediaTypes.APPLICATION_JACKSON_SMILE) - public Response allocateSegment( - DateTime timestamp, - @Context final HttpServletRequest req - ) + @Consumes(SmileMediaTypes.APPLICATION_JACKSON_SMILE) + public Response allocateSegment(DateTime timestamp, @Context final HttpServletRequest req) { ChatHandlers.authorizationCheck( req, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java index 9e64731..98f508f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java @@ -20,8 +20,10 @@ package org.apache.druid.segment.realtime.firehose; import com.google.common.base.Optional; +import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.server.initialization.jetty.BadRequestException; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import javax.ws.rs.Path; @@ -49,9 +51,19 @@ public class ChatHandlerResource public Object doTaskChat(@PathParam("id") String handlerId, @Context HttpHeaders headers) { if (taskId != null) { - List<String> requestTaskId = headers.getRequestHeader(TASK_ID_HEADER); - if (requestTaskId != null && !requestTaskId.contains(StringUtils.urlEncode(taskId))) { - return null; + final List<String> requestTaskIds = headers.getRequestHeader(TASK_ID_HEADER); + final String requestTaskId = requestTaskIds != null && !requestTaskIds.isEmpty() + ? Iterables.getOnlyElement(requestTaskIds) + : null; + + // Sanity check: Callers set TASK_ID_HEADER to our taskId (URL-encoded, if >= 0.14.0) if they want to be + // assured of talking to the correct task, and not just some other task running on the same port. + if (requestTaskId != null + && !requestTaskId.equals(taskId) + && !StringUtils.urlDecode(requestTaskId).equals(taskId)) { + throw new BadRequestException( + StringUtils.format("Requested taskId[%s] doesn't match with taskId[%s]", requestTaskId, taskId) + ); } } @@ -61,6 +73,6 @@ public class ChatHandlerResource return handler.get(); } - return null; + throw new BadRequestException(StringUtils.format("Can't find chatHandler for handler[%s]", handlerId)); } } diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java new file mode 100644 index 0000000..8badcab --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.initialization.jetty; + +public class BadRequestException extends RuntimeException +{ + public BadRequestException(String msg) + { + super(msg); + } +} diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestExceptionMapper.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestExceptionMapper.java new file mode 100644 index 0000000..8a98a3d --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestExceptionMapper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.initialization.jetty; + +import com.google.common.collect.ImmutableMap; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +@Provider +public class BadRequestExceptionMapper implements ExceptionMapper<BadRequestException> +{ + @Override + public Response toResponse(BadRequestException exception) + { + return Response.status(Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(ImmutableMap.of("error", exception.getMessage())) + .build(); + } +} diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index f67613d..27ee5f1 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -112,6 +112,7 @@ public class JettyServerModule extends JerseyServletModule binder.bind(DruidGuiceContainer.class).in(Scopes.SINGLETON); binder.bind(CustomExceptionMapper.class).in(Singleton.class); binder.bind(ForbiddenExceptionMapper.class).in(Singleton.class); + binder.bind(BadRequestExceptionMapper.class).in(Singleton.class); serve("/*").with(DruidGuiceContainer.class); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
