This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a95397e712a Allow request headers in HttpInputSource in native and MSQ 
 Ingestion (#16974)
a95397e712a is described below

commit a95397e712a7a5a53d43f5888cfe2077c2c74ae6
Author: Pranav <[email protected]>
AuthorDate: Wed Sep 11 22:48:44 2024 -0700

    Allow request headers in HttpInputSource in native and MSQ  Ingestion 
(#16974)
    
    Support for adding the request headers in http input source. we can now 
pass the additional headers as json in both native and MSQ.
---
 .../druid/indexing/overlord/TaskQueueTest.java     |  3 +-
 .../apache/druid/data/input/impl/HttpEntity.java   | 16 ++++-
 .../druid/data/input/impl/HttpInputSource.java     | 58 ++++++++++++++---
 .../data/input/impl/HttpInputSourceConfig.java     | 23 ++++++-
 .../druid/data/input/impl/HttpEntityTest.java      | 67 ++++++++++++++++++--
 .../data/input/impl/HttpInputSourceConfigTest.java | 20 +++++-
 .../druid/data/input/impl/HttpInputSourceTest.java | 72 +++++++++++++++++++---
 .../input/impl/InputEntityIteratingReaderTest.java |  2 +-
 .../catalog/model/table/HttpInputSourceDefn.java   | 39 +++++++++++-
 .../catalog/model/table/ExternalTableTest.java     |  6 +-
 .../model/table/HttpInputSourceDefnTest.java       | 33 ++++++----
 .../metadata/input/InputSourceModuleTest.java      |  2 +-
 .../druid/sql/calcite/CalciteInsertDmlTest.java    |  2 +-
 .../druid/sql/calcite/IngestTableFunctionTest.java | 15 +++--
 .../expected/ingest/httpExtern-logicalPlan.txt     |  2 +-
 15 files changed, 304 insertions(+), 56 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index ee90a3335a1..c7b7b13ef7e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -519,7 +519,7 @@ public class TaskQueueTest extends IngestionTestBase
     final String password = "AbCd_1234";
     final ObjectMapper mapper = getObjectMapper();
 
-    final HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(Collections.singleton("http"));
+    final HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(Collections.singleton("http"), null);
     mapper.setInjectableValues(new InjectableValues.Std()
                                    .addValue(HttpInputSourceConfig.class, 
httpInputSourceConfig)
                                    .addValue(ObjectMapper.class, new 
DefaultObjectMapper())
@@ -562,6 +562,7 @@ public class TaskQueueTest extends IngestionTestBase
                             "user",
                             new DefaultPasswordProvider(password),
                             null,
+                            null,
                             httpInputSourceConfig),
         new NoopInputFormat(),
         null,
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java 
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java
index e03495ce02a..b0c415d322b 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java
@@ -34,6 +34,7 @@ import java.io.InputStream;
 import java.net.URI;
 import java.net.URLConnection;
 import java.util.Base64;
+import java.util.Map;
 
 public class HttpEntity extends RetryingInputEntity
 {
@@ -45,15 +46,19 @@ public class HttpEntity extends RetryingInputEntity
   @Nullable
   private final PasswordProvider httpAuthenticationPasswordProvider;
 
+  private final Map<String, String> requestHeaders;
+
   HttpEntity(
       URI uri,
       @Nullable String httpAuthenticationUsername,
-      @Nullable PasswordProvider httpAuthenticationPasswordProvider
+      @Nullable PasswordProvider httpAuthenticationPasswordProvider,
+      @Nullable Map<String, String> requestHeaders
   )
   {
     this.uri = uri;
     this.httpAuthenticationUsername = httpAuthenticationUsername;
     this.httpAuthenticationPasswordProvider = 
httpAuthenticationPasswordProvider;
+    this.requestHeaders = requestHeaders;
   }
 
   @Override
@@ -65,7 +70,7 @@ public class HttpEntity extends RetryingInputEntity
   @Override
   protected InputStream readFrom(long offset) throws IOException
   {
-    return openInputStream(uri, httpAuthenticationUsername, 
httpAuthenticationPasswordProvider, offset);
+    return openInputStream(uri, httpAuthenticationUsername, 
httpAuthenticationPasswordProvider, offset, requestHeaders);
   }
 
   @Override
@@ -80,10 +85,15 @@ public class HttpEntity extends RetryingInputEntity
     return t -> t instanceof IOException;
   }
 
-  public static InputStream openInputStream(URI object, String userName, 
PasswordProvider passwordProvider, long offset)
+  public static InputStream openInputStream(URI object, String userName, 
PasswordProvider passwordProvider, long offset, final Map<String, String> 
requestHeaders)
       throws IOException
   {
     final URLConnection urlConnection = object.toURL().openConnection();
+    if (requestHeaders != null && requestHeaders.size() > 0) {
+      for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
+        urlConnection.addRequestProperty(entry.getKey(), entry.getValue());
+      }
+    }
     if (!Strings.isNullOrEmpty(userName) && passwordProvider != null) {
       String userPass = userName + ":" + passwordProvider.getPassword();
       String basicAuthString = "Basic " + 
Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
index 0ef8194f1e1..12f2316fb67 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
@@ -36,6 +36,7 @@ import 
org.apache.druid.data.input.impl.systemfield.SystemField;
 import 
org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory;
 import org.apache.druid.data.input.impl.systemfield.SystemFieldInputSource;
 import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -47,6 +48,7 @@ import java.io.File;
 import java.net.URI;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Stream;
@@ -64,6 +66,7 @@ public class HttpInputSource
   private final PasswordProvider httpAuthenticationPasswordProvider;
   private final SystemFields systemFields;
   private final HttpInputSourceConfig config;
+  private final Map<String, String> requestHeaders;
 
   @JsonCreator
   public HttpInputSource(
@@ -71,6 +74,7 @@ public class HttpInputSource
       @JsonProperty("httpAuthenticationUsername") @Nullable String 
httpAuthenticationUsername,
       @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider 
httpAuthenticationPasswordProvider,
       @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields 
systemFields,
+      @JsonProperty("requestHeaders") @Nullable Map<String, String> 
requestHeaders,
       @JacksonInject HttpInputSourceConfig config
   )
   {
@@ -80,17 +84,11 @@ public class HttpInputSource
     this.httpAuthenticationUsername = httpAuthenticationUsername;
     this.httpAuthenticationPasswordProvider = 
httpAuthenticationPasswordProvider;
     this.systemFields = systemFields == null ? SystemFields.none() : 
systemFields;
+    this.requestHeaders = requestHeaders == null ? Collections.emptyMap() : 
requestHeaders;
+    throwIfForbiddenHeaders(config, this.requestHeaders);
     this.config = config;
   }
 
-  @JsonIgnore
-  @Nonnull
-  @Override
-  public Set<String> getTypes()
-  {
-    return Collections.singleton(TYPE_KEY);
-  }
-
   public static void throwIfInvalidProtocols(HttpInputSourceConfig config, 
List<URI> uris)
   {
     for (URI uri : uris) {
@@ -100,6 +98,27 @@ public class HttpInputSource
     }
   }
 
+  public static void throwIfForbiddenHeaders(HttpInputSourceConfig config, 
Map<String, String> requestHeaders)
+  {
+    if (config.getAllowedHeaders().size() > 0) {
+      for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
+        if 
(!config.getAllowedHeaders().contains(StringUtils.toLowerCase(entry.getKey()))) 
{
+          throw InvalidInput.exception("Got forbidden header %s, allowed 
headers are only %s ",
+                                       entry.getKey(), 
config.getAllowedHeaders()
+          );
+        }
+      }
+    }
+  }
+
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(TYPE_KEY);
+  }
+
   @JsonProperty
   public List<URI> getUris()
   {
@@ -128,6 +147,14 @@ public class HttpInputSource
     return httpAuthenticationPasswordProvider;
   }
 
+  @Nullable
+  @JsonProperty("requestHeaders")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Map<String, String> getRequestHeaders()
+  {
+    return requestHeaders;
+  }
+
   @Override
   public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, 
@Nullable SplitHintSpec splitHintSpec)
   {
@@ -148,6 +175,7 @@ public class HttpInputSource
         httpAuthenticationUsername,
         httpAuthenticationPasswordProvider,
         systemFields,
+        requestHeaders,
         config
     );
   }
@@ -181,7 +209,8 @@ public class HttpInputSource
             createSplits(inputFormat, null).map(split -> new HttpEntity(
                 split.get(),
                 httpAuthenticationUsername,
-                httpAuthenticationPasswordProvider
+                httpAuthenticationPasswordProvider,
+                requestHeaders
             )).iterator()
         ),
         SystemFieldDecoratorFactory.fromInputSource(this),
@@ -203,13 +232,21 @@ public class HttpInputSource
            && Objects.equals(httpAuthenticationUsername, 
that.httpAuthenticationUsername)
            && Objects.equals(httpAuthenticationPasswordProvider, 
that.httpAuthenticationPasswordProvider)
            && Objects.equals(systemFields, that.systemFields)
+           && Objects.equals(requestHeaders, that.requestHeaders)
            && Objects.equals(config, that.config);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(uris, httpAuthenticationUsername, 
httpAuthenticationPasswordProvider, systemFields, config);
+    return Objects.hash(
+        uris,
+        httpAuthenticationUsername,
+        httpAuthenticationPasswordProvider,
+        systemFields,
+        requestHeaders,
+        config
+    );
   }
 
   @Override
@@ -226,6 +263,7 @@ public class HttpInputSource
            ", httpAuthenticationUsername=" + httpAuthenticationUsername +
            ", httpAuthenticationPasswordProvider=" + 
httpAuthenticationPasswordProvider +
            (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + 
systemFields) +
+           ", requestHeaders = " + requestHeaders +
            "}";
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
index 310c6690461..1299edbc574 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.druid.java.util.common.StringUtils;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -38,14 +39,21 @@ public class HttpInputSourceConfig
   @JsonProperty
   private final Set<String> allowedProtocols;
 
+  @JsonProperty
+  private final Set<String> allowedHeaders;
+
   @JsonCreator
   public HttpInputSourceConfig(
-      @JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
+      @JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols,
+      @JsonProperty("allowedHeaders") @Nullable Set<String> allowedHeaders
   )
   {
     this.allowedProtocols = allowedProtocols == null || 
allowedProtocols.isEmpty()
                             ? DEFAULT_ALLOWED_PROTOCOLS
                             : 
allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
+    this.allowedHeaders = allowedHeaders == null
+                          ? Collections.emptySet()
+                          : 
allowedHeaders.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
   }
 
   public Set<String> getAllowedProtocols()
@@ -53,6 +61,11 @@ public class HttpInputSourceConfig
     return allowedProtocols;
   }
 
+  public Set<String> getAllowedHeaders()
+  {
+    return allowedHeaders;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -63,13 +76,16 @@ public class HttpInputSourceConfig
       return false;
     }
     HttpInputSourceConfig that = (HttpInputSourceConfig) o;
-    return Objects.equals(allowedProtocols, that.allowedProtocols);
+    return Objects.equals(allowedProtocols, that.allowedProtocols) && 
Objects.equals(
+        allowedHeaders,
+        that.allowedHeaders
+    );
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(allowedProtocols);
+    return Objects.hash(allowedProtocols, allowedHeaders);
   }
 
   @Override
@@ -77,6 +93,7 @@ public class HttpInputSourceConfig
   {
     return "HttpInputSourceConfig{" +
            "allowedProtocols=" + allowedProtocols +
+           ", allowedHeaders=" + allowedHeaders +
            '}';
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java 
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java
index 59217456dbf..8776cd27573 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpEntityTest.java
@@ -19,7 +19,9 @@
 
 package org.apache.druid.data.input.impl;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.net.HttpHeaders;
+import com.sun.net.httpserver.Headers;
 import com.sun.net.httpserver.HttpServer;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.java.util.common.StringUtils;
@@ -42,6 +44,8 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
 
 public class HttpEntityTest
 {
@@ -96,8 +100,61 @@ public class HttpEntityTest
       server.start();
 
       URI url = new URI("http://"; + server.getAddress().getHostName() + ":" + 
server.getAddress().getPort() + "/test");
-      inputStream = HttpEntity.openInputStream(url, "", null, 0);
-      inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5);
+      inputStream = HttpEntity.openInputStream(url, "", null, 0, 
Collections.emptyMap());
+      inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5, 
Collections.emptyMap());
+      inputStream.skip(5);
+      Assert.assertTrue(IOUtils.contentEquals(inputStream, 
inputStreamPartial));
+    }
+    finally {
+      IOUtils.closeQuietly(inputStream);
+      IOUtils.closeQuietly(inputStreamPartial);
+      if (server != null) {
+        server.stop(0);
+      }
+      if (serverSocket != null) {
+        serverSocket.close();
+      }
+    }
+  }
+
+  @Test
+  public void testRequestHeaders() throws IOException, URISyntaxException
+  {
+    HttpServer server = null;
+    InputStream inputStream = null;
+    InputStream inputStreamPartial = null;
+    ServerSocket serverSocket = null;
+    Map<String, String> requestHeaders = ImmutableMap.of("r-Cookie", "test", 
"Content-Type", "application/json");
+    try {
+      serverSocket = new ServerSocket(0);
+      int port = serverSocket.getLocalPort();
+      // closing port so that the httpserver can use. Can cause race 
conditions.
+      serverSocket.close();
+      server = HttpServer.create(new InetSocketAddress("localhost", port), 0);
+      server.createContext(
+          "/test",
+          (httpExchange) -> {
+            Headers headers = httpExchange.getRequestHeaders();
+            for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
+              Assert.assertTrue(headers.containsKey(entry.getKey()));
+              Assert.assertEquals(headers.get(entry.getKey()).get(0), 
entry.getValue());
+            }
+            String payload = "12345678910";
+            byte[] outputBytes = payload.getBytes(StandardCharsets.UTF_8);
+            httpExchange.sendResponseHeaders(200, outputBytes.length);
+            OutputStream os = httpExchange.getResponseBody();
+            httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_TYPE, 
"application/octet-stream");
+            httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_LENGTH, 
String.valueOf(outputBytes.length));
+            httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_RANGE, 
"bytes 0");
+            os.write(outputBytes);
+            os.close();
+          }
+      );
+      server.start();
+
+      URI url = new URI("http://"; + server.getAddress().getHostName() + ":" + 
server.getAddress().getPort() + "/test");
+      inputStream = HttpEntity.openInputStream(url, "", null, 0, 
requestHeaders);
+      inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5, 
requestHeaders);
       inputStream.skip(5);
       Assert.assertTrue(IOUtils.contentEquals(inputStream, 
inputStreamPartial));
     }
@@ -119,7 +176,7 @@ public class HttpEntityTest
     long offset = 15;
     String contentRange = StringUtils.format("bytes %d-%d/%d", offset, 1000, 
1000);
     
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(contentRange);
-    HttpEntity.openInputStream(uri, "", null, offset);
+    HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
     Mockito.verify(inputStreamMock, Mockito.times(0)).skip(offset);
   }
 
@@ -128,7 +185,7 @@ public class HttpEntityTest
   {
     long offset = 15;
     
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(null);
-    HttpEntity.openInputStream(uri, "", null, offset);
+    HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
     Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset);
   }
 
@@ -137,7 +194,7 @@ public class HttpEntityTest
   {
     long offset = 15;
     
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn("token
 2-12/12");
-    HttpEntity.openInputStream(uri, "", null, offset);
+    HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
     Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
index 7f88fc7bf62..74cc62b4d81 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
@@ -24,6 +24,8 @@ import nl.jqno.equalsverifier.EqualsVerifier;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+
 public class HttpInputSourceConfigTest
 {
   @Test
@@ -35,21 +37,33 @@ public class HttpInputSourceConfigTest
   @Test
   public void testNullAllowedProtocolsUseDefault()
   {
-    HttpInputSourceConfig config = new HttpInputSourceConfig(null);
+    HttpInputSourceConfig config = new HttpInputSourceConfig(null, null);
     Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, 
config.getAllowedProtocols());
+    Assert.assertEquals(Collections.emptySet(), config.getAllowedHeaders());
   }
 
   @Test
   public void testEmptyAllowedProtocolsUseDefault()
   {
-    HttpInputSourceConfig config = new 
HttpInputSourceConfig(ImmutableSet.of());
+    HttpInputSourceConfig config = new 
HttpInputSourceConfig(ImmutableSet.of(), null);
     Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, 
config.getAllowedProtocols());
   }
 
   @Test
   public void testCustomAllowedProtocols()
   {
-    HttpInputSourceConfig config = new 
HttpInputSourceConfig(ImmutableSet.of("druid"));
+    HttpInputSourceConfig config = new 
HttpInputSourceConfig(ImmutableSet.of("druid"), null);
+    Assert.assertEquals(ImmutableSet.of("druid"), 
config.getAllowedProtocols());
+  }
+
+  @Test
+  public void testAllowedHeaders()
+  {
+    HttpInputSourceConfig config = new HttpInputSourceConfig(
+        ImmutableSet.of("druid"),
+        ImmutableSet.of("Content-Type", "Referer")
+    );
     Assert.assertEquals(ImmutableSet.of("druid"), 
config.getAllowedProtocols());
+    Assert.assertEquals(ImmutableSet.of("content-type", "referer"), 
config.getAllowedHeaders());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
index bcd6152f05d..118b56838b6 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
@@ -22,11 +22,15 @@ package org.apache.druid.data.input.impl;
 import com.fasterxml.jackson.databind.InjectableValues.Std;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.impl.systemfield.SystemField;
 import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.DefaultPasswordProvider;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -35,7 +39,10 @@ import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class HttpInputSourceTest
 {
@@ -45,7 +52,7 @@ public class HttpInputSourceTest
   @Test
   public void testSerde() throws IOException
   {
-    HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(null);
+    HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(null, null);
     final ObjectMapper mapper = new ObjectMapper();
     mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class, 
httpInputSourceConfig));
     final HttpInputSource source = new HttpInputSource(
@@ -53,6 +60,7 @@ public class HttpInputSourceTest
         "myName",
         new DefaultPasswordProvider("myPassword"),
         new SystemFields(EnumSet.of(SystemField.URI)),
+        null,
         httpInputSourceConfig
     );
     final byte[] json = mapper.writeValueAsBytes(source);
@@ -68,7 +76,8 @@ public class HttpInputSourceTest
         "myName",
         new DefaultPasswordProvider("myPassword"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
 
     new HttpInputSource(
@@ -76,7 +85,8 @@ public class HttpInputSourceTest
         "myName",
         new DefaultPasswordProvider("myPassword"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
 
     expectedException.expect(IllegalArgumentException.class);
@@ -86,19 +96,21 @@ public class HttpInputSourceTest
         "myName",
         new DefaultPasswordProvider("myPassword"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
   }
 
   @Test
   public void testConstructorAllowsOnlyCustomProtocols()
   {
-    final HttpInputSourceConfig customConfig = new 
HttpInputSourceConfig(ImmutableSet.of("druid"));
+    final HttpInputSourceConfig customConfig = new 
HttpInputSourceConfig(ImmutableSet.of("druid"), null);
     new HttpInputSource(
         ImmutableList.of(URI.create("druid:///")),
         "myName",
         new DefaultPasswordProvider("myPassword"),
         null,
+        null,
         customConfig
     );
 
@@ -109,6 +121,7 @@ public class HttpInputSourceTest
         "myName",
         new DefaultPasswordProvider("myPassword"),
         null,
+        null,
         customConfig
     );
   }
@@ -116,12 +129,13 @@ public class HttpInputSourceTest
   @Test
   public void testSystemFields()
   {
-    HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(null);
+    HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(null, null);
     final HttpInputSource inputSource = new HttpInputSource(
         ImmutableList.of(URI.create("http://test.com/http-test";)),
         "myName",
         new DefaultPasswordProvider("myPassword"),
         new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
+        null,
         httpInputSourceConfig
     );
 
@@ -130,10 +144,54 @@ public class HttpInputSourceTest
         inputSource.getConfiguredSystemFields()
     );
 
-    final HttpEntity entity = new 
HttpEntity(URI.create("https://example.com/foo";), null, null);
+    final HttpEntity entity = new 
HttpEntity(URI.create("https://example.com/foo";), null, null, null);
 
     Assert.assertEquals("https://example.com/foo";, 
inputSource.getSystemFieldValue(entity, SystemField.URI));
     Assert.assertEquals("/foo", inputSource.getSystemFieldValue(entity, 
SystemField.PATH));
+    Assert.assertEquals(inputSource.getRequestHeaders(), 
Collections.emptyMap());
+  }
+
+  @Test
+  public void testAllowedHeaders()
+  {
+    HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(
+        null,
+        Sets.newHashSet("R-cookie", "Content-type")
+    );
+    final HttpInputSource inputSource = new HttpInputSource(
+        ImmutableList.of(URI.create("http://test.com/http-test";)),
+        "myName",
+        new DefaultPasswordProvider("myPassword"),
+        new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
+        ImmutableMap.of("r-Cookie", "test", "Content-Type", 
"application/json"),
+        httpInputSourceConfig
+    );
+    Set<String> expectedSet = inputSource.getRequestHeaders()
+                                         .keySet()
+                                         .stream()
+                                         .map(StringUtils::toLowerCase)
+                                         .collect(Collectors.toSet());
+    Assert.assertEquals(expectedSet, 
httpInputSourceConfig.getAllowedHeaders());
+  }
+
+  @Test
+  public void shouldFailOnForbiddenHeaders()
+  {
+    HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(
+        null,
+        Sets.newHashSet("R-cookie", "Content-type")
+    );
+    expectedException.expect(DruidException.class);
+    expectedException.expectMessage(
+        "Got forbidden header G-Cookie, allowed headers are only [r-cookie, 
content-type]");
+    new HttpInputSource(
+        ImmutableList.of(URI.create("http://test.com/http-test";)),
+        "myName",
+        new DefaultPasswordProvider("myPassword"),
+        new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
+        ImmutableMap.of("G-Cookie", "test", "Content-Type", 
"application/json"),
+        httpInputSourceConfig
+    );
   }
 
   @Test
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
index 80602d0508a..5f1b5f365fb 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
@@ -204,7 +204,7 @@ public class InputEntityIteratingReaderTest extends 
InitializedNullHandlingTest
         ),
         CloseableIterators.withEmptyBaggage(
             ImmutableList.of(
-                new HttpEntity(new URI("testscheme://some/path"), null, null)
+                new HttpEntity(new URI("testscheme://some/path"), null, null, 
null)
                 {
                   @Override
                   protected int getMaxRetries()
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
index b4a3cf3425a..110974d18ca 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.catalog.model.table;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.catalog.model.CatalogUtils;
 import org.apache.druid.catalog.model.ColumnSpec;
@@ -27,6 +29,7 @@ import 
org.apache.druid.catalog.model.table.TableFunction.ParameterDefn;
 import org.apache.druid.catalog.model.table.TableFunction.ParameterType;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.impl.HttpInputSource;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.metadata.DefaultPasswordProvider;
@@ -93,6 +96,7 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
   public static final String PASSWORD_PARAMETER = "password";
   public static final String PASSWORD_ENV_VAR_PARAMETER = "passwordEnvVar";
 
+  public static final String HEADERS = "headers";
   private static final List<ParameterDefn> URI_PARAMS = 
Collections.singletonList(
       new Parameter(URIS_PARAMETER, ParameterType.VARCHAR_ARRAY, true)
   );
@@ -103,10 +107,15 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
       new Parameter(PASSWORD_ENV_VAR_PARAMETER, ParameterType.VARCHAR, true)
   );
 
+  private static final List<ParameterDefn> HEADERS_PARAMS = 
Collections.singletonList(
+      new Parameter(HEADERS, ParameterType.VARCHAR, true)
+  );
+
   // Field names in the HttpInputSource
   protected static final String URIS_FIELD = "uris";
   protected static final String PASSWORD_FIELD = "httpAuthenticationPassword";
   protected static final String USERNAME_FIELD = "httpAuthenticationUsername";
+  protected static final String HEADERS_FIELD = "requestHeaders";
 
   @Override
   public String typeValue()
@@ -201,7 +210,7 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
   @Override
   protected List<ParameterDefn> adHocTableFnParameters()
   {
-    return CatalogUtils.concatLists(URI_PARAMS, USER_PWD_PARAMS);
+    return CatalogUtils.concatLists(URI_PARAMS, 
CatalogUtils.concatLists(USER_PWD_PARAMS, HEADERS_PARAMS));
   }
 
   @Override
@@ -210,6 +219,7 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
     jsonMap.put(InputSource.TYPE_PROPERTY, HttpInputSource.TYPE_KEY);
     convertUriArg(jsonMap, args);
     convertUserPasswordArgs(jsonMap, args);
+    convertHeaderArg(jsonMap, args);
   }
 
   @Override
@@ -228,6 +238,10 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
       params = CatalogUtils.concatLists(params, USER_PWD_PARAMS);
     }
 
+    if (!sourceMap.containsKey(HEADERS_FIELD)) {
+      params = CatalogUtils.concatLists(params, HEADERS_PARAMS);
+    }
+
     // Does the table define a format?
     if (table.inputFormatMap == null) {
       params = addFormatParameters(params);
@@ -255,6 +269,9 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
     if (!sourceMap.containsKey(USERNAME_FIELD) && 
!sourceMap.containsKey(PASSWORD_FIELD)) {
       convertUserPasswordArgs(sourceMap, args);
     }
+    if (!sourceMap.containsKey(HEADERS_FIELD)) {
+      convertHeaderArg(sourceMap, args);
+    }
     return convertPartialFormattedTable(table, args, columns, sourceMap);
   }
 
@@ -283,6 +300,26 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
     }
   }
 
+  /**
+   * URIs in SQL is in the form of a string that contains a comma-delimited
+   * set of URIs. Done since SQL doesn't support array scalars.
+   */
+  private void convertHeaderArg(Map<String, Object> jsonMap, Map<String, 
Object> args)
+  {
+    String requestHeaders = CatalogUtils.getString(args, HEADERS);
+    Map<String, String> headersMap;
+    if (requestHeaders != null) {
+      try {
+        headersMap = DefaultObjectMapper.INSTANCE.readValue(requestHeaders, 
new TypeReference<Map<String, String>>(){});
+      }
+      catch (JsonProcessingException e) {
+        throw new ISE("Failed read map from headers json");
+      }
+      jsonMap.put(HEADERS_FIELD, headersMap);
+    }
+
+  }
+
   /**
    * Convert the user name and password. All are SQL strings. Passwords must 
be in
    * the form of a password provider, so do the needed conversion. HTTP 
provides
diff --git 
a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
 
b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
index 8c8129bf0fc..1992f98e2ff 100644
--- 
a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
+++ 
b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
@@ -170,7 +170,8 @@ public class ExternalTableTest extends BaseExternTableTest
         "bob",
         new DefaultPasswordProvider("secret"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     Map<String, Object> sourceMap = toMap(inputSource);
     sourceMap.remove("uris");
@@ -195,7 +196,8 @@ public class ExternalTableTest extends BaseExternTableTest
         "bob",
         new DefaultPasswordProvider("secret"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("koala")
         .inputSource(toMap(inputSource))
diff --git 
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
 
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
index 8a6385db59c..e60139824d7 100644
--- 
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
+++ 
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
@@ -57,7 +57,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
   {
     mapper.setInjectableValues(new InjectableValues.Std().addValue(
         HttpInputSourceConfig.class,
-        new 
HttpInputSourceConfig(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS)
+        new 
HttpInputSourceConfig(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, null)
     ));
   }
 
@@ -99,7 +99,8 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         null,
         null,
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("foo")
         .inputSource(toMap(inputSource))
@@ -119,7 +120,8 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         null,
         null,
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("foo")
         .inputSource(toMap(inputSource))
@@ -150,7 +152,8 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         null,
         null,
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("foo")
         .inputSource(toMap(inputSource))
@@ -216,7 +219,8 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         "bob",
         new DefaultPasswordProvider("secret"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("foo")
         .inputSource(toMap(inputSource))
@@ -272,11 +276,12 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
 
     // Get the partial table function
     TableFunction fn = externDefn.tableFn(resolved);
-    assertEquals(4, fn.parameters().size());
+    assertEquals(5, fn.parameters().size());
     assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
     assertTrue(hasParam(fn, HttpInputSourceDefn.USER_PARAMETER));
     assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_PARAMETER));
     assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_ENV_VAR_PARAMETER));
+    assertTrue(hasParam(fn, HttpInputSourceDefn.HEADERS));
 
     // Convert to an external table.
     ExternalTableSpec externSpec = fn.apply(
@@ -320,8 +325,9 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
 
     // Get the partial table function
     TableFunction fn = externDefn.tableFn(resolved);
-    assertEquals(1, fn.parameters().size());
+    assertEquals(2, fn.parameters().size());
     assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
+    assertTrue(hasParam(fn, HttpInputSourceDefn.HEADERS));
 
     // Convert to an external table.
     ExternalTableSpec externSpec = fn.apply(
@@ -344,7 +350,8 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         "bob",
         new DefaultPasswordProvider("secret"),
         null,
-        new HttpInputSourceConfig(null)
+         null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("foo")
         .inputSource(httpToMap(inputSource))
@@ -382,7 +389,8 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         "bob",
         new EnvironmentVariablePasswordProvider("SECRET"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("foo")
         .inputSource(toMap(inputSource))
@@ -415,7 +423,8 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         "bob",
         new DefaultPasswordProvider("secret"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("foo")
         .inputSource(httpToMap(inputSource))
@@ -484,7 +493,8 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         "bob",
         new EnvironmentVariablePasswordProvider("SECRET"),
         null,
-        new HttpInputSourceConfig(null)
+        null,
+        new HttpInputSourceConfig(null, null)
     );
     TableMetadata table = TableBuilder.external("foo")
         .inputSource(toMap(inputSource))
@@ -518,7 +528,6 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
       assertEquals("secret", ((DefaultPasswordProvider) 
sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
     }
     assertEquals("http://foo.com/my.csv";, 
sourceSpec.getUris().get(0).toString());
-
     // Just a sanity check: details of CSV conversion are tested elsewhere.
     CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat;
     assertEquals(Arrays.asList("x", "y"), csvFormat.getColumns());
diff --git 
a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
 
b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
index 074b1dbeb22..496f45596a2 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
@@ -79,7 +79,7 @@ public class InputSourceModuleTest
     Properties props = new Properties();
     Injector injector = makeInjectorWithProperties(props);
     HttpInputSourceConfig instance = 
injector.getInstance(HttpInputSourceConfig.class);
-    Assert.assertEquals(new HttpInputSourceConfig(null), instance);
+    Assert.assertEquals(new HttpInputSourceConfig(null, null), instance);
     Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, 
instance.getAllowedProtocols());
   }
 
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index ed4aa6d91eb..fbcb0735c86 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -860,7 +860,7 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
 
 
     // Test correctness of the query when only the CLUSTERED BY clause is 
present
-    final String explanation = 
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"]},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}]},\"right\":{
 [...]
+    final String explanation = 
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"],\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"S
 [...]
 
     testQuery(
         PLANNER_CONFIG_NATIVE_QUERY_EXPLAIN,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
index 29200730fee..7401477e6d7 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.avatica.SqlType;
 import org.apache.druid.catalog.model.Columns;
 import org.apache.druid.data.input.impl.CsvInputFormat;
@@ -86,7 +87,8 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
           "bob",
           new DefaultPasswordProvider("secret"),
           SystemFields.none(),
-          new HttpInputSourceConfig(null)
+          null,
+          new HttpInputSourceConfig(null, null)
       ),
       new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 
0),
       RowSignature.builder()
@@ -259,7 +261,8 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
             "bob",
             new DefaultPasswordProvider("secret"),
             SystemFields.none(),
-            new HttpInputSourceConfig(null)
+            ImmutableMap.of("Accept", "application/ndjson", "a", "b"),
+            new HttpInputSourceConfig(null, null)
         ),
         new CsvInputFormat(ImmutableList.of("timestamp", "isRobot"), null, 
false, false, 0),
         RowSignature.builder()
@@ -280,7 +283,8 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
              "  userName => 'bob',\n" +
              "  password => 'secret',\n" +
              "  uris => ARRAY['http://example.com/foo.csv', 
'http://example.com/bar.csv'],\n" +
-             "  format => 'csv'\n" +
+             "  format => 'csv',\n" +
+             "  headers=> '{\"Accept\":\"application/ndjson\", \"a\": \"b\" 
}'\n" +
              "  )\n" +
              ") EXTEND (\"timestamp\" VARCHAR, isRobot VARCHAR)\n" +
              "PARTITIONED BY HOUR")
@@ -313,7 +317,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
         "                format => 'csv'))\n" +
         "     EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
         "PARTITIONED BY ALL TIME";
-    final String explanation = 
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\";
 [...]
+    final String explanation = 
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"},\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\
 [...]
     final String resources = 
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
     final String attributes = 
"{\"statementType\":\"INSERT\",\"targetDataSource\":\"dst\",\"partitionedBy\":{\"type\":\"all\"}}";
 
@@ -390,7 +394,8 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
             "bob",
             new DefaultPasswordProvider("secret"),
             SystemFields.none(),
-            new HttpInputSourceConfig(null)
+            null,
+            new HttpInputSourceConfig(null, null)
         ),
         new JsonInputFormat(null, null, null, null, null),
         RowSignature.builder()
diff --git 
a/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt 
b/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt
index 18dcef56af4..f9e4c4a5de2 100644
--- a/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt
+++ b/sql/src/test/resources/calcite/expected/ingest/httpExtern-logicalPlan.txt
@@ -1,3 +1,3 @@
 LogicalInsert(target=[dst], partitionedBy=[ALL TIME], clusteredBy=[<none>])
   LogicalProject(inputs=[0..2])
-    
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])
+    
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"},"requestHeaders":{}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to