This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a95397e712a Allow request headers in HttpInputSource in native and MSQ
Ingestion (#16974)
a95397e712a is described below
commit a95397e712a7a5a53d43f5888cfe2077c2c74ae6
Author: Pranav <[email protected]>
AuthorDate: Wed Sep 11 22:48:44 2024 -0700
Allow request headers in HttpInputSource in native and MSQ Ingestion
(#16974)
Support for adding the request headers in http input source. we can now
pass the additional headers as json in both native and MSQ.
---
.../druid/indexing/overlord/TaskQueueTest.java | 3 +-
.../apache/druid/data/input/impl/HttpEntity.java | 16 ++++-
.../druid/data/input/impl/HttpInputSource.java | 58 ++++++++++++++---
.../data/input/impl/HttpInputSourceConfig.java | 23 ++++++-
.../druid/data/input/impl/HttpEntityTest.java | 67 ++++++++++++++++++--
.../data/input/impl/HttpInputSourceConfigTest.java | 20 +++++-
.../druid/data/input/impl/HttpInputSourceTest.java | 72 +++++++++++++++++++---
.../input/impl/InputEntityIteratingReaderTest.java | 2 +-
.../catalog/model/table/HttpInputSourceDefn.java | 39 +++++++++++-
.../catalog/model/table/ExternalTableTest.java | 6 +-
.../model/table/HttpInputSourceDefnTest.java | 33 ++++++----
.../metadata/input/InputSourceModuleTest.java | 2 +-
.../druid/sql/calcite/CalciteInsertDmlTest.java | 2 +-
.../druid/sql/calcite/IngestTableFunctionTest.java | 15 +++--
.../expected/ingest/httpExtern-logicalPlan.txt | 2 +-
15 files changed, 304 insertions(+), 56 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index ee90a3335a1..c7b7b13ef7e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -519,7 +519,7 @@ public class TaskQueueTest extends IngestionTestBase
final String password = "AbCd_1234";
final ObjectMapper mapper = getObjectMapper();
- final HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(Collections.singleton("http"));
+ final HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(Collections.singleton("http"), null);
mapper.setInjectableValues(new InjectableValues.Std()
.addValue(HttpInputSourceConfig.class,
httpInputSourceConfig)
.addValue(ObjectMapper.class, new
DefaultObjectMapper())
@@ -562,6 +562,7 @@ public class TaskQueueTest extends IngestionTestBase
"user",
new DefaultPasswordProvider(password),
null,
+ null,
httpInputSourceConfig),
new NoopInputFormat(),
null,
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java
index e03495ce02a..b0c415d322b 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java
@@ -34,6 +34,7 @@ import java.io.InputStream;
import java.net.URI;
import java.net.URLConnection;
import java.util.Base64;
+import java.util.Map;
public class HttpEntity extends RetryingInputEntity
{
@@ -45,15 +46,19 @@ public class HttpEntity extends RetryingInputEntity
@Nullable
private final PasswordProvider httpAuthenticationPasswordProvider;
+ private final Map<String, String> requestHeaders;
+
HttpEntity(
URI uri,
@Nullable String httpAuthenticationUsername,
- @Nullable PasswordProvider httpAuthenticationPasswordProvider
+ @Nullable PasswordProvider httpAuthenticationPasswordProvider,
+ @Nullable Map<String, String> requestHeaders
)
{
this.uri = uri;
this.httpAuthenticationUsername = httpAuthenticationUsername;
this.httpAuthenticationPasswordProvider =
httpAuthenticationPasswordProvider;
+ this.requestHeaders = requestHeaders;
}
@Override
@@ -65,7 +70,7 @@ public class HttpEntity extends RetryingInputEntity
@Override
protected InputStream readFrom(long offset) throws IOException
{
- return openInputStream(uri, httpAuthenticationUsername,
httpAuthenticationPasswordProvider, offset);
+ return openInputStream(uri, httpAuthenticationUsername,
httpAuthenticationPasswordProvider, offset, requestHeaders);
}
@Override
@@ -80,10 +85,15 @@ public class HttpEntity extends RetryingInputEntity
return t -> t instanceof IOException;
}
- public static InputStream openInputStream(URI object, String userName,
PasswordProvider passwordProvider, long offset)
+ public static InputStream openInputStream(URI object, String userName,
PasswordProvider passwordProvider, long offset, final Map<String, String>
requestHeaders)
throws IOException
{
final URLConnection urlConnection = object.toURL().openConnection();
+ if (requestHeaders != null && requestHeaders.size() > 0) {
+ for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
+ urlConnection.addRequestProperty(entry.getKey(), entry.getValue());
+ }
+ }
if (!Strings.isNullOrEmpty(userName) && passwordProvider != null) {
String userPass = userName + ":" + passwordProvider.getPassword();
String basicAuthString = "Basic " +
Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
index 0ef8194f1e1..12f2316fb67 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
@@ -36,6 +36,7 @@ import
org.apache.druid.data.input.impl.systemfield.SystemField;
import
org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory;
import org.apache.druid.data.input.impl.systemfield.SystemFieldInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
@@ -47,6 +48,7 @@ import java.io.File;
import java.net.URI;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
@@ -64,6 +66,7 @@ public class HttpInputSource
private final PasswordProvider httpAuthenticationPasswordProvider;
private final SystemFields systemFields;
private final HttpInputSourceConfig config;
+ private final Map<String, String> requestHeaders;
@JsonCreator
public HttpInputSource(
@@ -71,6 +74,7 @@ public class HttpInputSource
@JsonProperty("httpAuthenticationUsername") @Nullable String
httpAuthenticationUsername,
@JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider
httpAuthenticationPasswordProvider,
@JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields
systemFields,
+ @JsonProperty("requestHeaders") @Nullable Map<String, String>
requestHeaders,
@JacksonInject HttpInputSourceConfig config
)
{
@@ -80,17 +84,11 @@ public class HttpInputSource
this.httpAuthenticationUsername = httpAuthenticationUsername;
this.httpAuthenticationPasswordProvider =
httpAuthenticationPasswordProvider;
this.systemFields = systemFields == null ? SystemFields.none() :
systemFields;
+ this.requestHeaders = requestHeaders == null ? Collections.emptyMap() :
requestHeaders;
+ throwIfForbiddenHeaders(config, this.requestHeaders);
this.config = config;
}
- @JsonIgnore
- @Nonnull
- @Override
- public Set<String> getTypes()
- {
- return Collections.singleton(TYPE_KEY);
- }
-
public static void throwIfInvalidProtocols(HttpInputSourceConfig config,
List<URI> uris)
{
for (URI uri : uris) {
@@ -100,6 +98,27 @@ public class HttpInputSource
}
}
+ public static void throwIfForbiddenHeaders(HttpInputSourceConfig config,
Map<String, String> requestHeaders)
+ {
+ if (config.getAllowedHeaders().size() > 0) {
+ for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
+ if
(!config.getAllowedHeaders().contains(StringUtils.toLowerCase(entry.getKey())))
{
+ throw InvalidInput.exception("Got forbidden header %s, allowed
headers are only %s ",
+ entry.getKey(),
config.getAllowedHeaders()
+ );
+ }
+ }
+ }
+ }
+
+ @JsonIgnore
+ @Nonnull
+ @Override
+ public Set<String> getTypes()
+ {
+ return Collections.singleton(TYPE_KEY);
+ }
+
@JsonProperty
public List<URI> getUris()
{
@@ -128,6 +147,14 @@ public class HttpInputSource
return httpAuthenticationPasswordProvider;
}
+ @Nullable
+ @JsonProperty("requestHeaders")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<String, String> getRequestHeaders()
+ {
+ return requestHeaders;
+ }
+
@Override
public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec)
{
@@ -148,6 +175,7 @@ public class HttpInputSource
httpAuthenticationUsername,
httpAuthenticationPasswordProvider,
systemFields,
+ requestHeaders,
config
);
}
@@ -181,7 +209,8 @@ public class HttpInputSource
createSplits(inputFormat, null).map(split -> new HttpEntity(
split.get(),
httpAuthenticationUsername,
- httpAuthenticationPasswordProvider
+ httpAuthenticationPasswordProvider,
+ requestHeaders
)).iterator()
),
SystemFieldDecoratorFactory.fromInputSource(this),
@@ -203,13 +232,21 @@ public class HttpInputSource
&& Objects.equals(httpAuthenticationUsername,
that.httpAuthenticationUsername)
&& Objects.equals(httpAuthenticationPasswordProvider,
that.httpAuthenticationPasswordProvider)
&& Objects.equals(systemFields, that.systemFields)
+ && Objects.equals(requestHeaders, that.requestHeaders)
&& Objects.equals(config, that.config);
}
@Override
public int hashCode()
{
- return Objects.hash(uris, httpAuthenticationUsername,
httpAuthenticationPasswordProvider, systemFields, config);
+ return Objects.hash(
+ uris,
+ httpAuthenticationUsername,
+ httpAuthenticationPasswordProvider,
+ systemFields,
+ requestHeaders,
+ config
+ );
}
@Override
@@ -226,6 +263,7 @@ public class HttpInputSource
", httpAuthenticationUsername=" + httpAuthenticationUsername +
", httpAuthenticationPasswordProvider=" +
httpAuthenticationPasswordProvider +
(systemFields.getFields().isEmpty() ? "" : ", systemFields=" +
systemFields) +
+ ", requestHeaders = " + requestHeaders +
"}";
}
}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
index 310c6690461..1299edbc574 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -38,14 +39,21 @@ public class HttpInputSourceConfig
@JsonProperty
private final Set<String> allowedProtocols;
+ @JsonProperty
+ private final Set<String> allowedHeaders;
+
@JsonCreator
public HttpInputSourceConfig(
- @JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
+ @JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols,
+ @JsonProperty("allowedHeaders") @Nullable Set<String> allowedHeaders
)
{
this.allowedProtocols = allowedProtocols == null ||
allowedProtocols.isEmpty()
? DEFAULT_ALLOWED_PROTOCOLS
:
allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
+ this.allowedHeaders = allowedHeaders == null
+ ? Collections.emptySet()
+ :
allowedHeaders.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
}
public Set<String> getAllowedProtocols()
@@ -53,6 +61,11 @@ public class HttpInputSourceConfig
return allowedProtocols;
}
+ public Set<String> getAllowedHeaders()
+ {
+ return allowedHeaders;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -63,13 +76,16 @@ public class HttpInputSourceConfig
return false;
}
HttpInputSourceConfig that = (HttpInputSourceConfig) o;
- return Objects.equals(allowedProtocols, that.allowedProtocols);
+ return Objects.equals(allowedProtocols, that.allowedProtocols) &&
Objects.equals(
+ allowedHeaders,
+ that.allowedHeaders
+ );
}
@Override
public int hashCode()
{
- return Objects.hash(allowedProtocols);
+ return Objects.hash(allowedProtocols, allowedHeaders);
}
@Override
@@ -77,6 +93,7 @@ public class HttpInputSourceConfig
{
return "HttpInputSourceConfig{" +
"allowedProtocols=" + allowedProtocols +
+ ", allowedHeaders=" + allowedHeaders +
'}';
}
}
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java
index 59217456dbf..8776cd27573 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java
@@ -19,7 +19,9 @@
package org.apache.druid.data.input.impl;
+import com.google.common.collect.ImmutableMap;
import com.google.common.net.HttpHeaders;
+import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.StringUtils;
@@ -42,6 +44,8 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
public class HttpEntityTest
{
@@ -96,8 +100,61 @@ public class HttpEntityTest
server.start();
URI url = new URI("http://" + server.getAddress().getHostName() + ":" +
server.getAddress().getPort() + "/test");
- inputStream = HttpEntity.openInputStream(url, "", null, 0);
- inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5);
+ inputStream = HttpEntity.openInputStream(url, "", null, 0,
Collections.emptyMap());
+ inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5,
Collections.emptyMap());
+ inputStream.skip(5);
+ Assert.assertTrue(IOUtils.contentEquals(inputStream,
inputStreamPartial));
+ }
+ finally {
+ IOUtils.closeQuietly(inputStream);
+ IOUtils.closeQuietly(inputStreamPartial);
+ if (server != null) {
+ server.stop(0);
+ }
+ if (serverSocket != null) {
+ serverSocket.close();
+ }
+ }
+ }
+
+ @Test
+ public void testRequestHeaders() throws IOException, URISyntaxException
+ {
+ HttpServer server = null;
+ InputStream inputStream = null;
+ InputStream inputStreamPartial = null;
+ ServerSocket serverSocket = null;
+ Map<String, String> requestHeaders = ImmutableMap.of("r-Cookie", "test",
"Content-Type", "application/json");
+ try {
+ serverSocket = new ServerSocket(0);
+ int port = serverSocket.getLocalPort();
+ // closing port so that the httpserver can use. Can cause race
conditions.
+ serverSocket.close();
+ server = HttpServer.create(new InetSocketAddress("localhost", port), 0);
+ server.createContext(
+ "/test",
+ (httpExchange) -> {
+ Headers headers = httpExchange.getRequestHeaders();
+ for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
+ Assert.assertTrue(headers.containsKey(entry.getKey()));
+ Assert.assertEquals(headers.get(entry.getKey()).get(0),
entry.getValue());
+ }
+ String payload = "12345678910";
+ byte[] outputBytes = payload.getBytes(StandardCharsets.UTF_8);
+ httpExchange.sendResponseHeaders(200, outputBytes.length);
+ OutputStream os = httpExchange.getResponseBody();
+ httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_TYPE,
"application/octet-stream");
+ httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_LENGTH,
String.valueOf(outputBytes.length));
+ httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_RANGE,
"bytes 0");
+ os.write(outputBytes);
+ os.close();
+ }
+ );
+ server.start();
+
+ URI url = new URI("http://" + server.getAddress().getHostName() + ":" +
server.getAddress().getPort() + "/test");
+ inputStream = HttpEntity.openInputStream(url, "", null, 0,
requestHeaders);
+ inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5,
requestHeaders);
inputStream.skip(5);
Assert.assertTrue(IOUtils.contentEquals(inputStream,
inputStreamPartial));
}
@@ -119,7 +176,7 @@ public class HttpEntityTest
long offset = 15;
String contentRange = StringUtils.format("bytes %d-%d/%d", offset, 1000,
1000);
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(contentRange);
- HttpEntity.openInputStream(uri, "", null, offset);
+ HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
Mockito.verify(inputStreamMock, Mockito.times(0)).skip(offset);
}
@@ -128,7 +185,7 @@ public class HttpEntityTest
{
long offset = 15;
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(null);
- HttpEntity.openInputStream(uri, "", null, offset);
+ HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset);
}
@@ -137,7 +194,7 @@ public class HttpEntityTest
{
long offset = 15;
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn("token
2-12/12");
- HttpEntity.openInputStream(uri, "", null, offset);
+ HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
index 7f88fc7bf62..74cc62b4d81 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
@@ -24,6 +24,8 @@ import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collections;
+
public class HttpInputSourceConfigTest
{
@Test
@@ -35,21 +37,33 @@ public class HttpInputSourceConfigTest
@Test
public void testNullAllowedProtocolsUseDefault()
{
- HttpInputSourceConfig config = new HttpInputSourceConfig(null);
+ HttpInputSourceConfig config = new HttpInputSourceConfig(null, null);
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS,
config.getAllowedProtocols());
+ Assert.assertEquals(Collections.emptySet(), config.getAllowedHeaders());
}
@Test
public void testEmptyAllowedProtocolsUseDefault()
{
- HttpInputSourceConfig config = new
HttpInputSourceConfig(ImmutableSet.of());
+ HttpInputSourceConfig config = new
HttpInputSourceConfig(ImmutableSet.of(), null);
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS,
config.getAllowedProtocols());
}
@Test
public void testCustomAllowedProtocols()
{
- HttpInputSourceConfig config = new
HttpInputSourceConfig(ImmutableSet.of("druid"));
+ HttpInputSourceConfig config = new
HttpInputSourceConfig(ImmutableSet.of("druid"), null);
+ Assert.assertEquals(ImmutableSet.of("druid"),
config.getAllowedProtocols());
+ }
+
+ @Test
+ public void testAllowedHeaders()
+ {
+ HttpInputSourceConfig config = new HttpInputSourceConfig(
+ ImmutableSet.of("druid"),
+ ImmutableSet.of("Content-Type", "Referer")
+ );
Assert.assertEquals(ImmutableSet.of("druid"),
config.getAllowedProtocols());
+ Assert.assertEquals(ImmutableSet.of("content-type", "referer"),
config.getAllowedHeaders());
}
}
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
index bcd6152f05d..118b56838b6 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
@@ -22,11 +22,15 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.junit.Assert;
import org.junit.Rule;
@@ -35,7 +39,10 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
public class HttpInputSourceTest
{
@@ -45,7 +52,7 @@ public class HttpInputSourceTest
@Test
public void testSerde() throws IOException
{
- HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(null);
+ HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(null, null);
final ObjectMapper mapper = new ObjectMapper();
mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class,
httpInputSourceConfig));
final HttpInputSource source = new HttpInputSource(
@@ -53,6 +60,7 @@ public class HttpInputSourceTest
"myName",
new DefaultPasswordProvider("myPassword"),
new SystemFields(EnumSet.of(SystemField.URI)),
+ null,
httpInputSourceConfig
);
final byte[] json = mapper.writeValueAsBytes(source);
@@ -68,7 +76,8 @@ public class HttpInputSourceTest
"myName",
new DefaultPasswordProvider("myPassword"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
new HttpInputSource(
@@ -76,7 +85,8 @@ public class HttpInputSourceTest
"myName",
new DefaultPasswordProvider("myPassword"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
expectedException.expect(IllegalArgumentException.class);
@@ -86,19 +96,21 @@ public class HttpInputSourceTest
"myName",
new DefaultPasswordProvider("myPassword"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
}
@Test
public void testConstructorAllowsOnlyCustomProtocols()
{
- final HttpInputSourceConfig customConfig = new
HttpInputSourceConfig(ImmutableSet.of("druid"));
+ final HttpInputSourceConfig customConfig = new
HttpInputSourceConfig(ImmutableSet.of("druid"), null);
new HttpInputSource(
ImmutableList.of(URI.create("druid:///")),
"myName",
new DefaultPasswordProvider("myPassword"),
null,
+ null,
customConfig
);
@@ -109,6 +121,7 @@ public class HttpInputSourceTest
"myName",
new DefaultPasswordProvider("myPassword"),
null,
+ null,
customConfig
);
}
@@ -116,12 +129,13 @@ public class HttpInputSourceTest
@Test
public void testSystemFields()
{
- HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(null);
+ HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(null, null);
final HttpInputSource inputSource = new HttpInputSource(
ImmutableList.of(URI.create("http://test.com/http-test")),
"myName",
new DefaultPasswordProvider("myPassword"),
new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
+ null,
httpInputSourceConfig
);
@@ -130,10 +144,54 @@ public class HttpInputSourceTest
inputSource.getConfiguredSystemFields()
);
- final HttpEntity entity = new
HttpEntity(URI.create("https://example.com/foo"), null, null);
+ final HttpEntity entity = new
HttpEntity(URI.create("https://example.com/foo"), null, null, null);
Assert.assertEquals("https://example.com/foo",
inputSource.getSystemFieldValue(entity, SystemField.URI));
Assert.assertEquals("/foo", inputSource.getSystemFieldValue(entity,
SystemField.PATH));
+ Assert.assertEquals(inputSource.getRequestHeaders(),
Collections.emptyMap());
+ }
+
+ @Test
+ public void testAllowedHeaders()
+ {
+ HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(
+ null,
+ Sets.newHashSet("R-cookie", "Content-type")
+ );
+ final HttpInputSource inputSource = new HttpInputSource(
+ ImmutableList.of(URI.create("http://test.com/http-test")),
+ "myName",
+ new DefaultPasswordProvider("myPassword"),
+ new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
+ ImmutableMap.of("r-Cookie", "test", "Content-Type",
"application/json"),
+ httpInputSourceConfig
+ );
+ Set<String> expectedSet = inputSource.getRequestHeaders()
+ .keySet()
+ .stream()
+ .map(StringUtils::toLowerCase)
+ .collect(Collectors.toSet());
+ Assert.assertEquals(expectedSet,
httpInputSourceConfig.getAllowedHeaders());
+ }
+
+ @Test
+ public void shouldFailOnForbiddenHeaders()
+ {
+ HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(
+ null,
+ Sets.newHashSet("R-cookie", "Content-type")
+ );
+ expectedException.expect(DruidException.class);
+ expectedException.expectMessage(
+ "Got forbidden header G-Cookie, allowed headers are only [r-cookie,
content-type]");
+ new HttpInputSource(
+ ImmutableList.of(URI.create("http://test.com/http-test")),
+ "myName",
+ new DefaultPasswordProvider("myPassword"),
+ new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
+ ImmutableMap.of("G-Cookie", "test", "Content-Type",
"application/json"),
+ httpInputSourceConfig
+ );
}
@Test
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
index 80602d0508a..5f1b5f365fb 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
@@ -204,7 +204,7 @@ public class InputEntityIteratingReaderTest extends
InitializedNullHandlingTest
),
CloseableIterators.withEmptyBaggage(
ImmutableList.of(
- new HttpEntity(new URI("testscheme://some/path"), null, null)
+ new HttpEntity(new URI("testscheme://some/path"), null, null,
null)
{
@Override
protected int getMaxRetries()
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
index b4a3cf3425a..110974d18ca 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
@@ -19,6 +19,8 @@
package org.apache.druid.catalog.model.table;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.model.CatalogUtils;
import org.apache.druid.catalog.model.ColumnSpec;
@@ -27,6 +29,7 @@ import
org.apache.druid.catalog.model.table.TableFunction.ParameterDefn;
import org.apache.druid.catalog.model.table.TableFunction.ParameterType;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.HttpInputSource;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.metadata.DefaultPasswordProvider;
@@ -93,6 +96,7 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
public static final String PASSWORD_PARAMETER = "password";
public static final String PASSWORD_ENV_VAR_PARAMETER = "passwordEnvVar";
+ public static final String HEADERS = "headers";
private static final List<ParameterDefn> URI_PARAMS =
Collections.singletonList(
new Parameter(URIS_PARAMETER, ParameterType.VARCHAR_ARRAY, true)
);
@@ -103,10 +107,15 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
new Parameter(PASSWORD_ENV_VAR_PARAMETER, ParameterType.VARCHAR, true)
);
+ private static final List<ParameterDefn> HEADERS_PARAMS =
Collections.singletonList(
+ new Parameter(HEADERS, ParameterType.VARCHAR, true)
+ );
+
// Field names in the HttpInputSource
protected static final String URIS_FIELD = "uris";
protected static final String PASSWORD_FIELD = "httpAuthenticationPassword";
protected static final String USERNAME_FIELD = "httpAuthenticationUsername";
+ protected static final String HEADERS_FIELD = "requestHeaders";
@Override
public String typeValue()
@@ -201,7 +210,7 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
@Override
protected List<ParameterDefn> adHocTableFnParameters()
{
- return CatalogUtils.concatLists(URI_PARAMS, USER_PWD_PARAMS);
+ return CatalogUtils.concatLists(URI_PARAMS,
CatalogUtils.concatLists(USER_PWD_PARAMS, HEADERS_PARAMS));
}
@Override
@@ -210,6 +219,7 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
jsonMap.put(InputSource.TYPE_PROPERTY, HttpInputSource.TYPE_KEY);
convertUriArg(jsonMap, args);
convertUserPasswordArgs(jsonMap, args);
+ convertHeaderArg(jsonMap, args);
}
@Override
@@ -228,6 +238,10 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
params = CatalogUtils.concatLists(params, USER_PWD_PARAMS);
}
+ if (!sourceMap.containsKey(HEADERS_FIELD)) {
+ params = CatalogUtils.concatLists(params, HEADERS_PARAMS);
+ }
+
// Does the table define a format?
if (table.inputFormatMap == null) {
params = addFormatParameters(params);
@@ -255,6 +269,9 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
if (!sourceMap.containsKey(USERNAME_FIELD) &&
!sourceMap.containsKey(PASSWORD_FIELD)) {
convertUserPasswordArgs(sourceMap, args);
}
+ if (!sourceMap.containsKey(HEADERS_FIELD)) {
+ convertHeaderArg(sourceMap, args);
+ }
return convertPartialFormattedTable(table, args, columns, sourceMap);
}
@@ -283,6 +300,26 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
}
}
+ /**
+ * URIs in SQL is in the form of a string that contains a comma-delimited
+ * set of URIs. Done since SQL doesn't support array scalars.
+ */
+ private void convertHeaderArg(Map<String, Object> jsonMap, Map<String,
Object> args)
+ {
+ String requestHeaders = CatalogUtils.getString(args, HEADERS);
+ Map<String, String> headersMap;
+ if (requestHeaders != null) {
+ try {
+ headersMap = DefaultObjectMapper.INSTANCE.readValue(requestHeaders,
new TypeReference<Map<String, String>>(){});
+ }
+ catch (JsonProcessingException e) {
+ throw new ISE("Failed read map from headers json");
+ }
+ jsonMap.put(HEADERS_FIELD, headersMap);
+ }
+
+ }
+
/**
* Convert the user name and password. All are SQL strings. Passwords must
be in
* the form of a password provider, so do the needed conversion. HTTP
provides
diff --git
a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
index 8c8129bf0fc..1992f98e2ff 100644
---
a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
+++
b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
@@ -170,7 +170,8 @@ public class ExternalTableTest extends BaseExternTableTest
"bob",
new DefaultPasswordProvider("secret"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
Map<String, Object> sourceMap = toMap(inputSource);
sourceMap.remove("uris");
@@ -195,7 +196,8 @@ public class ExternalTableTest extends BaseExternTableTest
"bob",
new DefaultPasswordProvider("secret"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("koala")
.inputSource(toMap(inputSource))
diff --git
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
index 8a6385db59c..e60139824d7 100644
---
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
+++
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
@@ -57,7 +57,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
{
mapper.setInjectableValues(new InjectableValues.Std().addValue(
HttpInputSourceConfig.class,
- new
HttpInputSourceConfig(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS)
+ new
HttpInputSourceConfig(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, null)
));
}
@@ -99,7 +99,8 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
null,
null,
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
@@ -119,7 +120,8 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
null,
null,
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
@@ -150,7 +152,8 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
null,
null,
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
@@ -216,7 +219,8 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
"bob",
new DefaultPasswordProvider("secret"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
@@ -272,11 +276,12 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
// Get the partial table function
TableFunction fn = externDefn.tableFn(resolved);
- assertEquals(4, fn.parameters().size());
+ assertEquals(5, fn.parameters().size());
assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
assertTrue(hasParam(fn, HttpInputSourceDefn.USER_PARAMETER));
assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_PARAMETER));
assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_ENV_VAR_PARAMETER));
+ assertTrue(hasParam(fn, HttpInputSourceDefn.HEADERS));
// Convert to an external table.
ExternalTableSpec externSpec = fn.apply(
@@ -320,8 +325,9 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
// Get the partial table function
TableFunction fn = externDefn.tableFn(resolved);
- assertEquals(1, fn.parameters().size());
+ assertEquals(2, fn.parameters().size());
assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
+ assertTrue(hasParam(fn, HttpInputSourceDefn.HEADERS));
// Convert to an external table.
ExternalTableSpec externSpec = fn.apply(
@@ -344,7 +350,8 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
"bob",
new DefaultPasswordProvider("secret"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(httpToMap(inputSource))
@@ -382,7 +389,8 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
"bob",
new EnvironmentVariablePasswordProvider("SECRET"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
@@ -415,7 +423,8 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
"bob",
new DefaultPasswordProvider("secret"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(httpToMap(inputSource))
@@ -484,7 +493,8 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
"bob",
new EnvironmentVariablePasswordProvider("SECRET"),
null,
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
@@ -518,7 +528,6 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
assertEquals("secret", ((DefaultPasswordProvider)
sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
}
assertEquals("http://foo.com/my.csv",
sourceSpec.getUris().get(0).toString());
-
// Just a sanity check: details of CSV conversion are tested elsewhere.
CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat;
assertEquals(Arrays.asList("x", "y"), csvFormat.getColumns());
diff --git
a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
index 074b1dbeb22..496f45596a2 100644
---
a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
@@ -79,7 +79,7 @@ public class InputSourceModuleTest
Properties props = new Properties();
Injector injector = makeInjectorWithProperties(props);
HttpInputSourceConfig instance =
injector.getInstance(HttpInputSourceConfig.class);
- Assert.assertEquals(new HttpInputSourceConfig(null), instance);
+ Assert.assertEquals(new HttpInputSourceConfig(null, null), instance);
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS,
instance.getAllowedProtocols());
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index ed4aa6d91eb..fbcb0735c86 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -860,7 +860,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
// Test correctness of the query when only the CLUSTERED BY clause is
present
- final String explanation =
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"]},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}]},\"right\":{
[...]
+ final String explanation =
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"],\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"S
[...]
testQuery(
PLANNER_CONFIG_NATIVE_QUERY_EXPLAIN,
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
index 29200730fee..7401477e6d7 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.calcite.avatica.SqlType;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.data.input.impl.CsvInputFormat;
@@ -86,7 +87,8 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
"bob",
new DefaultPasswordProvider("secret"),
SystemFields.none(),
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
),
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false,
0),
RowSignature.builder()
@@ -259,7 +261,8 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
"bob",
new DefaultPasswordProvider("secret"),
SystemFields.none(),
- new HttpInputSourceConfig(null)
+ ImmutableMap.of("Accept", "application/ndjson", "a", "b"),
+ new HttpInputSourceConfig(null, null)
),
new CsvInputFormat(ImmutableList.of("timestamp", "isRobot"), null,
false, false, 0),
RowSignature.builder()
@@ -280,7 +283,8 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
" userName => 'bob',\n" +
" password => 'secret',\n" +
" uris => ARRAY['http://example.com/foo.csv',
'http://example.com/bar.csv'],\n" +
- " format => 'csv'\n" +
+ " format => 'csv',\n" +
+ " headers=> '{\"Accept\":\"application/ndjson\", \"a\": \"b\"
}'\n" +
" )\n" +
") EXTEND (\"timestamp\" VARCHAR, isRobot VARCHAR)\n" +
"PARTITIONED BY HOUR")
@@ -313,7 +317,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
" format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME";
- final String explanation =
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"
[...]
+ final String explanation =
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"},\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\
[...]
final String resources =
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
final String attributes =
"{\"statementType\":\"INSERT\",\"targetDataSource\":\"dst\",\"partitionedBy\":{\"type\":\"all\"}}";
@@ -390,7 +394,8 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
"bob",
new DefaultPasswordProvider("secret"),
SystemFields.none(),
- new HttpInputSourceConfig(null)
+ null,
+ new HttpInputSourceConfig(null, null)
),
new JsonInputFormat(null, null, null, null, null),
RowSignature.builder()
diff --git
a/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt
b/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt
index 18dcef56af4..f9e4c4a5de2 100644
--- a/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt
+++ b/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt
@@ -1,3 +1,3 @@
LogicalInsert(target=[dst], partitionedBy=[ALL TIME], clusteredBy=[<none>])
LogicalProject(inputs=[0..2])
-
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])
+
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"},"requestHeaders":{}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]