paul-rogers commented on code in PR #13627:
URL: https://github.com/apache/druid/pull/13627#discussion_r1067363578


##########
docs/multi-stage-query/reference.md:
##########
@@ -88,93 +118,260 @@ FROM TABLE(
   http(
     userName => 'bob',
     password => 'secret',
-    uris => 'http:foo.com/bar.csv',
+    uris => ARRAY['http:example.com/foo.csv', 'http:example.com/bar.csv'],
     format => 'csv'
     )
   ) (x VARCHAR, y VARCHAR, z BIGINT)
 ```
 
+#### Function Arguments
 
-The set of table functions and formats is preliminary in this release.
+These table functions are intended for use with the SQL by-name argument syntax
+as shown above. Because the functions include all parameters for all formats,
+using positional calls is both cumbersome and error-prone.
+
+Function argument names are generally the same as the JSON field names, except
+as noted below. Each argument has a SQL type which matches the JSON type. For
+arguments that take a string list in JSON, use the SQL `ARRAY[...]` syntax in
+SQL as shown in the above example.
+
+Array parameters are good candidates for use in parameterized queries. That is:
+
+```sql
+SELECT
+ <column>
+FROM TABLE(
+  http(
+    userName => 'bob',
+    password => 'secret',
+    uris => ?,
+    format => 'csv'
+    )
+  ) (x VARCHAR, y VARCHAR, z BIGINT)
+```
 
-#### `HTTP`
+Provide the list of URIs (in this case) as a query parameter in each ingest. 
Doing
+so is simpler than writing a script to insert the array into the SQL text.
 
-The `HTTP` table function represents the `HttpInputSource` class in Druid 
which allows you to
-read from an HTTP server. The function accepts the following arguments:
+#### `HTTP` Function
 
-| Name | Description | JSON equivalent | Required |
-| ---- | ----------- | --------------- | -------- |
-| `userName` | Basic authentication user name | `httpAuthenticationUsername` | 
No |
-| `password` | Basic authentication password | `httpAuthenticationPassword` | 
No |
-| `passwordEnvVar` | Environment variable that contains the basic 
authentication password| `httpAuthenticationPassword` | No |
-| `uris` | Comma-separated list of URIs to read. | `uris` | Yes |
+The `HTTP` table function represents the
+[HTTP input 
source](../ingestion/native-batch-input-sources.md#http-input-source)
+to read from an HTTP server. The function accepts the following arguments:
 
-#### `INLINE`
+* `userName` (`VARCHAR`) -  Same as JSON `httpAuthenticationUsername`.
+* `password`  (`VARCHAR`) - Same as`httpAuthenticationPassword` when used with 
the default option.
+* `passwordEnvVar` (`VARCHAR`) - Same as the HTTP `httpAuthenticationPassword` 
when used with
+  the `"type": "environment"` option.
+* `uris` (`ARRAY` of `VARCHAR`)
 
-The `INLINE` table function represents the `InlineInputSource` class in Druid 
which provides
-data directly in the table function. The function accepts the following 
arguments:
+#### `INLINE` Function
 
-| Name | Description | JSON equivalent | Required |
-| ---- | ----------- | --------------- | -------- |
-| `data` | Text lines of inline data. Separate lines with a newline. | `data` 
| Yes |
+The `INLINE` table function represents the
+[Inline input 
source](../ingestion/native-batch-input-sources.md#inline-input-source)
+which provides data directly in the table function. Parameter:
 
-#### `LOCALFILES`
+* `data` (`ARRAY` of `VARCHAR`) - Data lines, without a trailing newline, as 
an array.
 
-The `LOCALFILES` table function represents the `LocalInputSource` class in 
Druid which reads
+Example:
+
+```sql
+FROM TABLE(
+  inline(
+    data => ARRAY[
+       "a,b",
+       "c,d"],
+    format => 'csv'
+    )
+  ) (x VARCHAR, y VARCHAR)
+```
+
+
+#### `LOCALFILES` Function
+
+The `LOCALFILES` table function represents the
+[Local input 
source](../ingestion/native-batch-input-sources.md#local-input-source) which 
reads
 files from the file system of the node running Druid. This is most useful for 
single-node
-installations. The function accepts the following arguments:
+installations. The function accepts the following parameters:
 
-| Name | Description | JSON equivalent | Required |
-| ---- | ----------- | --------------- | -------- |
-| `baseDir` | Directory to read from. | `baseDir` | No |
-| `filter` | Filter pattern to read. Example: `*.csv`. | `filter` | No |
-| `files` | Comma-separated list of files to read. | `files` | No |
+* `baseDir`
+* `filter`
+* `files`
 
-You must either provide the `baseDir` or the list of `files`. You can provide 
both, in which case
-the files are assumed relative to the `baseDir`. If you provide a `filter`, 
you must provide the
-`baseDir`.
+When the local files input source is used directly in an `extern` function, or 
ingestion spec, you
+can provide either `baseDir` and `filter` or `files` but not both. This 
function, however, allows
+you to provide any of the following combinations:
 
-#### Table Function Format
+* `baseDir` - Matches all files in the given directory. (Assumes the filter is 
`*`.)
+* `baseDir` and `filter` - Match files in the given directory using the filter.
+* `baseDir` and `files` - A set of files relative to `baseDir`.
+* `files` - The files should be absolute paths, else they will be computed 
relative to Druid's
+  working directory (usually the Druid install directory.)
 
-Each of the table functions above requires that you specify a format.
+Examples:
 
-| Name | Description | JSON equivalent | Required |
-| ---- | ----------- | --------------- | -------- |
-| `format` | The input format, using the same names as for `EXTERN`. | 
`inputFormat.type` | Yes |
+```sql
+  -- All files in /tmp, which must be CSV files
+  localfiles(baseDir => '/tmp',
+             format => 'csv')
+
+  -- CSV files in /tmp
+  localfiles(baseDir => '/tmp',
+             filter => '*.csv',
+             format => 'csv')
+
+  -- /tmp/a.csv and /tmp/b.csv
+  localfiles(baseDir => '/tmp',
+             files => ARRAY['a.csv', 'b.csv'],
+             format => 'csv')
+
+  -- /tmp/a.csv and /tmp/b.csv
+  localfiles(files => ARRAY['/tmp/a.csv', '/tmp/b.csv'],
+             format => 'csv')
+```
 
-#### CSV Format
+#### `S3` Function
 
-Use the `csv` format to read from CSV. This choice selects the Druid 
`CsvInputFormat` class.
+The `S3` table function represents the
+[S3 input source](../ingestion/native-batch-input-sources.md#s3-input-source) 
which reads
+files from an S3 bucket. The function accepts the following parameters to 
specify the
+objects to read:
 
-| Name | Description | JSON equivalent | Required |
-| ---- | ----------- | --------------- | -------- |
-| `listDelimiter` | The delimiter to use for fields that represent a list of 
strings. | `listDelimiter` | No |
-| `skipRows` | The number of rows to skip at the start of the file. Default is 
0. | `skipHeaderRows` | No |
+* `uris` (`ARRAY` of `VARCHAR`)
+* `prefix` (`VARCHAR`) - Corresponds to the JSON `prefixes` property, but 
allows a single
+  prefix.
+* `bucket` (`VARCHAR`) - Corresponds to the `bucket` field of the `objects` 
JSON field. SQL
+  does not have syntax for an array of objects. Instead, this function taks a 
single bucket,
+  and one or more objects within that bucket.
+* `paths` (`ARRAY` of `VARCHAR`) - Corresponds to the `path` fields of the 
`object` JSON field.

Review Comment:
   They already appear? Check down two paragraphs. The intro to this paragraph 
says, "The function accepts the following parameters to specify the objects to 
read" while the later one says "also accepts the following security properties".



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/catalog/model/table/S3InputSourceDefn.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import org.apache.druid.catalog.model.CatalogUtils;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.table.BaseTableFunction.Parameter;
+import org.apache.druid.catalog.model.table.TableFunction.ParameterDefn;
+import org.apache.druid.catalog.model.table.TableFunction.ParameterType;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.s3.S3InputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Catalog definition for the S3 input source.
+ * <p>
+ * The catalog entry contains a serialized S3 input source, with some 
simplifying variations.
+ * The catalog entry can define the {@code bucket} table property which is the 
(single) bucket
+ * value to use when creating the list of objects: the catalog provides the 
bucket, the table
+ * function provides the list of objects. (Done this way since defining two 
correlated lists
+ * in SQL is awkward, and credentials make the most sense when working with a 
single bucket.)
+ * <p>
+ * The ad-hoc function allows the various ways to specify the objects, but not 
the configuration
+ * parameters. If the user wishes to use such parameters, they should be 
defined in a catalog
+ * entry, since providing maps in SQL is awkward.
+ * <p>
+ * The partial table function can be of various forms:
+ * <ul>
+ * <li>Fully define the table, which means providing the full set of S3 
properties and not
+ * providing the table-level {@code bucket} property. This form is complete 
and does't need
+ * a table function. If used with a table function, the function provides the 
{@code glob}
+ * parameter (if not already provided in the table spec.)</li>
+ * <li>Partially define the table: using URIs with the {@code glob} to be 
provided later, or
+ * by using the {@code bucket} table property. The table function provides the 
{@code objects}
+ * parameter to specify the specific objects. This form provides both the 
format and the list
+ * of columns.</li>
+ * <li>Partially define the table as a connection: provide only the {@code 
bucket} property,
+ * and omit both the format and the columns. The table function requests the 
{@code objects}
+ * and the {@code format}. The user must provide the list of columns.</li>
+ * </ul>
+ *
+ * @see {@link S3InputSource} for details on the meaning of the various 
properties, and the rules
+ * about valid combinations
+ */
+public class S3InputSourceDefn extends FormattedInputSourceDefn
+{
+  public static final String URIS_PARAMETER = "uris";
+  public static final String PREFIXES_PARAMETER = "prefixes";
+  public static final String BUCKET_PARAMETER = "bucket";
+  public static final String PATHS_PARAMETER = "paths";
+  public static final String ACCESS_KEY_ID_PARAMETER = "accessKeyId";
+  public static final String SECRET_ACCESS_KEY_PARAMETER = "secretAccessKey";
+  public static final String ASSUME_ROLE_ARN_PARAMETER = "assumeRoleArn";
+
+  /**
+   * The {@code objectGlob} property exists in S3, but is not documented. The 
corresponding
+   * function parameter also exists, but is not documented.
+   */
+  public static final String OBJECT_GLOB_PARAMETER = "objectGlob";
+
+  public static final String BUCKET_PROPERTY = "bucket";

Review Comment:
   Because this is a property (a top-level entry in the properties map for an 
external table definition), and not a field (a member of an input source 
object). Added a description of the naming convention to `InputSourceDefn`.



##########
docs/multi-stage-query/reference.md:
##########
@@ -280,6 +481,31 @@ The following ISO 8601 periods are supported for 
`TIME_FLOOR`:
 - P3M
 - P1Y
 
+The string constant can also include any of the keywords mentioned above:
+
+- `HOUR` - Same as `'PT1H'`
+- `DAY` - Same as `'P1D'`

Review Comment:
   Removed this change because it will come in a later PR. Any of Druid's valid 
strings will work. However, WEEK isn't a valid SQL keyword, so it can't be used 
in the syntax shown earlier.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/catalog/model/table/S3InputSourceDefn.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import org.apache.druid.catalog.model.CatalogUtils;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.table.BaseTableFunction.Parameter;
+import org.apache.druid.catalog.model.table.TableFunction.ParameterDefn;
+import org.apache.druid.catalog.model.table.TableFunction.ParameterType;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.s3.S3InputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Catalog definition for the S3 input source.
+ * <p>
+ * The catalog entry contains a serialized S3 input source, with some 
simplifying variations.
+ * The catalog entry can define the {@code bucket} table property which is the 
(single) bucket
+ * value to use when creating the list of objects: the catalog provides the 
bucket, the table
+ * function provides the list of objects. (Done this way since defining two 
correlated lists
+ * in SQL is awkward, and credentials make the most sense when working with a 
single bucket.)
+ * <p>
+ * The ad-hoc function allows the various ways to specify the objects, but not 
the configuration
+ * parameters. If the user wishes to use such parameters, they should be 
defined in a catalog
+ * entry, since providing maps in SQL is awkward.
+ * <p>
+ * The partial table function can be of various forms:
+ * <ul>
+ * <li>Fully define the table, which means providing the full set of S3 
properties and not
+ * providing the table-level {@code bucket} property. This form is complete 
and does't need
+ * a table function. If used with a table function, the function provides the 
{@code glob}
+ * parameter (if not already provided in the table spec.)</li>
+ * <li>Partially define the table: using URIs with the {@code glob} to be 
provided later, or
+ * by using the {@code bucket} table property. The table function provides the 
{@code objects}
+ * parameter to specify the specific objects. This form provides both the 
format and the list
+ * of columns.</li>
+ * <li>Partially define the table as a connection: provide only the {@code 
bucket} property,
+ * and omit both the format and the columns. The table function requests the 
{@code objects}
+ * and the {@code format}. The user must provide the list of columns.</li>
+ * </ul>
+ *
+ * @see {@link S3InputSource} for details on the meaning of the various 
properties, and the rules
+ * about valid combinations
+ */
+public class S3InputSourceDefn extends FormattedInputSourceDefn
+{
+  public static final String URIS_PARAMETER = "uris";
+  public static final String PREFIXES_PARAMETER = "prefixes";
+  public static final String BUCKET_PARAMETER = "bucket";
+  public static final String PATHS_PARAMETER = "paths";
+  public static final String ACCESS_KEY_ID_PARAMETER = "accessKeyId";
+  public static final String SECRET_ACCESS_KEY_PARAMETER = "secretAccessKey";
+  public static final String ASSUME_ROLE_ARN_PARAMETER = "assumeRoleArn";
+
+  /**
+   * The {@code objectGlob} property exists in S3, but is not documented. The 
corresponding
+   * function parameter also exists, but is not documented.
+   */
+  public static final String OBJECT_GLOB_PARAMETER = "objectGlob";
+
+  public static final String BUCKET_PROPERTY = "bucket";
+
+  private static final ParameterDefn URI_PARAM_DEFN = new 
Parameter(URIS_PARAMETER, ParameterType.VARCHAR_ARRAY, true);
+  private static final ParameterDefn PREFIXES_PARAM_DEFN = new 
Parameter(PREFIXES_PARAMETER, ParameterType.VARCHAR_ARRAY, true);
+  private static final ParameterDefn BUCKET_PARAM_DEFN = new 
Parameter(BUCKET_PARAMETER, ParameterType.VARCHAR, true);
+  private static final ParameterDefn PATHS_PARAM_DEFN = new 
Parameter(PATHS_PARAMETER, ParameterType.VARCHAR_ARRAY, true);
+  private static final ParameterDefn OBJECTS_GLOB_PARAM_DEFN = new 
Parameter(OBJECT_GLOB_PARAMETER, ParameterType.VARCHAR, true);
+  private static final List<ParameterDefn> SECURITY_PARAMS = Arrays.asList(
+      new Parameter(ACCESS_KEY_ID_PARAMETER, ParameterType.VARCHAR, true),
+      new Parameter(SECRET_ACCESS_KEY_PARAMETER, ParameterType.VARCHAR, true),
+      new Parameter(ASSUME_ROLE_ARN_PARAMETER, ParameterType.VARCHAR, true)
+  );
+
+  // Field names in the S3InputSource
+  private static final String URIS_FIELD = "uris";
+  private static final String PREFIXES_FIELD = "prefixes";
+  private static final String OBJECTS_FIELD = "objects";
+  private static final String OBJECT_GLOB_FIELD = "objectGlob";
+  private static final String PROPERTIES_FIELD = "properties";
+  private static final String ACCESS_KEY_ID_FIELD = "accessKeyId";
+  private static final String SECRET_ACCESS_KEY_FIELD = "secretAccessKey";
+  private static final String ASSUME_ROLE_ARN_FIELD = "assumeRoleArn";
+
+  @Override
+  public String typeValue()
+  {
+    return S3StorageDruidModule.SCHEME;
+  }
+
+  @Override
+  protected Class<? extends InputSource> inputSourceClass()
+  {
+    return S3InputSource.class;
+  }
+
+  @Override
+  public void validate(ResolvedExternalTable table)
+  {
+    final boolean hasFormat = table.inputFormatMap != null;
+    final boolean hasColumns = 
!CollectionUtils.isNullOrEmpty(table.resolvedTable().spec().columns());
+
+    if (hasFormat && !hasColumns) {
+      throw new IAE(
+          "An external S3 table with a format must also provide the 
corresponding columns"
+      );
+    }
+
+    // The user can either provide a bucket, or can provide one of the valid 
items.
+    final String bucket = 
table.resolvedTable().stringProperty(BUCKET_PROPERTY);
+    final boolean hasBucket = bucket != null;
+    final Map<String, Object> sourceMap = table.inputSourceMap;
+    final boolean hasUris = sourceMap.containsKey(URIS_FIELD);
+    final boolean hasPrefix = sourceMap.containsKey(PREFIXES_FIELD);
+    final boolean hasObjects = sourceMap.containsKey(OBJECTS_FIELD);
+    final boolean hasGlob = sourceMap.containsKey(OBJECT_GLOB_FIELD);
+    if (hasBucket) {
+      if (hasUris || hasPrefix || hasObjects) {
+        throw new IAE(
+            "Provide either the %s property, or one of the S3 input source 
fields %s, %s or %s, but not both.",
+            BUCKET_PROPERTY,
+            URIS_FIELD,
+            PREFIXES_FIELD,
+            OBJECTS_FIELD
+        );
+      }
+      if (hasGlob) {
+        throw new IAE(
+            "The %s property cannot be provided when the the %s property is 
set",
+            OBJECT_GLOB_FIELD,
+            BUCKET_PROPERTY
+        );
+      }
+
+      // Patch in a dummy URI so that validation of the rest of the fields
+      // will pass.
+      sourceMap.put(URIS_FIELD, Collections.singletonList(bucket));
+    }
+    super.validate(table);
+  }
+
+  @Override
+  protected List<ParameterDefn> adHocTableFnParameters()
+  {
+    return CatalogUtils.concatLists(
+        Arrays.asList(
+            URI_PARAM_DEFN,
+            PREFIXES_PARAM_DEFN,
+            BUCKET_PARAM_DEFN,
+            PATHS_PARAM_DEFN,
+            OBJECTS_GLOB_PARAM_DEFN
+        ),
+        SECURITY_PARAMS
+    );
+  }
+
+  @Override
+  protected void convertArgsToSourceMap(Map<String, Object> jsonMap, 
Map<String, Object> args)
+  {
+    jsonMap.put(InputSource.TYPE_PROPERTY, S3StorageDruidModule.SCHEME);
+    final List<String> uris = CatalogUtils.getStringArray(args, 
URIS_PARAMETER);
+    final List<String> prefixes = CatalogUtils.getStringArray(args, 
PREFIXES_PARAMETER);
+    final String bucket = CatalogUtils.getNonBlankString(args, 
BUCKET_PARAMETER);
+    final List<String> paths = CatalogUtils.getStringArray(args, 
PATHS_PARAMETER);
+    final String objectGlob = CatalogUtils.getNonBlankString(args, 
OBJECT_GLOB_PARAMETER);
+    final boolean hasUris = uris != null;
+    final boolean hasPrefixes = prefixes != null;
+    final boolean hasBucket = bucket != null;
+    final boolean hasPaths = !CollectionUtils.isNullOrEmpty(paths);
+    if (hasPaths && !hasBucket) {
+      throw new IAE(
+          "S3 requires the %s parameter if %s is set",
+          BUCKET_PARAMETER,
+          PATHS_PARAMETER
+      );
+    }
+    if ((hasUris && (hasPrefixes || hasBucket)) || (hasPrefixes && hasBucket)) 
{
+      throw new IAE(
+          "S3 accepts only one of %s, %s or %s",
+          PATHS_PARAMETER,
+          BUCKET_PARAMETER,
+          PREFIXES_PARAMETER
+      );
+    }
+    if (!hasUris && !hasPrefixes && !hasBucket) {
+      throw new IAE(
+          "S3 requires one of %s, %s or %s",
+          PATHS_PARAMETER,
+          BUCKET_PARAMETER,
+          PREFIXES_PARAMETER
+      );
+    }
+    if (hasUris) {
+      jsonMap.put(URIS_FIELD, CatalogUtils.stringListToUriList(uris));
+    }
+    if (hasPrefixes) {
+      jsonMap.put(PREFIXES_FIELD, prefixes);
+    }
+    if (hasBucket) {
+      if (!hasPaths) {
+        throw new IAE("When using the %s parameter, %s must also be provided", 
BUCKET_PARAMETER, PATHS_PARAMETER);
+      }
+      List<CloudObjectLocation> objects = new ArrayList<>();
+      for (String obj : paths) {
+        objects.add(new CloudObjectLocation(bucket, obj));
+      }
+      jsonMap.put(OBJECTS_FIELD, objects);
+    }
+    if (objectGlob != null) {
+      jsonMap.put(OBJECT_GLOB_FIELD, objectGlob);

Review Comment:
   I don't think so. It's been a while since I did this mapping; I'd have to go 
back and plow through the code again to reverify what the S3 input source does 
and does not allow. What would it mean to provide a list of files and a filter?



##########
server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for input source definitions.
+ *
+ * @see {@link FormattedInputSourceDefn} for the base class for (most) input 
formats
+ * which take an input format.
+ */
+public abstract class BaseInputSourceDefn implements InputSourceDefn
+{
+  /**
+   * The "from-scratch" table function for this input source. The parameters
+   * are those defined by the subclass, and the apply simply turns around and
+   * asks the input source definition to do the conversion.
+   */
+  public class AdHocTableFunction extends BaseTableFunction
+  {
+    public AdHocTableFunction(List<ParameterDefn> parameters)
+    {
+      super(parameters);
+    }
+
+    @Override
+    public ExternalTableSpec apply(
+        final String fnName,
+        final Map<String, Object> args,
+        final List<ColumnSpec> columns,
+        final ObjectMapper jsonMapper
+    )
+    {
+      requireSchema(fnName, columns);
+      return convertArgsToTable(args, columns, jsonMapper);
+    }
+  }
+
+  /**
+   * The "partial" table function that starts with a catalog external table 
spec, then
+   * uses SQL function arguments to "complete" (i.e. fill in) the missing 
properties to
+   * produce a complete table which is then converted to an external table 
which Calcite
+   * can use.
+   * <p>
+   * The set of parameters depends on the input source and on whether or not 
the catalog
+   * spec provides a format.
+   */
+  public class PartialTableFunction extends BaseTableFunction
+  {
+    private final ResolvedExternalTable table;
+
+    public PartialTableFunction(final ResolvedExternalTable table, 
List<ParameterDefn> params)
+    {
+      super(params);
+      this.table = table;
+    }
+
+    @Override
+    public ExternalTableSpec apply(
+        final String fnName,
+        final Map<String, Object> args,
+        final List<ColumnSpec> columns,
+        final ObjectMapper jsonMapper
+    )
+    {
+      if 
(CollectionUtils.isNullOrEmpty(table.resolvedTable().spec().columns())) {
+        requireSchema(fnName, columns);
+      }
+      return convertCompletedTable(table, args, columns);
+    }
+  }
+
+  /**
+   * The one and only from-scratch table function for this input source. The
+   * function is defined a bind time, not construction time, since it typically
+   * needs visibility to the set of available input formats.
+   */
+  private AdHocTableFunction adHocTableFn;
+
+  /**
+   * Overridden by each subclass to return the input source class to be
+   * used for JSON conversions.
+   */
+  protected abstract Class<? extends InputSource> inputSourceClass();
+
+  @Override
+  public void bind(TableDefnRegistry registry)
+  {
+    this.adHocTableFn = defineAdHocTableFunction();
+  }
+
+  @Override
+  public void validate(ResolvedExternalTable table)
+  {
+    convertTableToSource(table);
+  }
+
+  /**
+   * Overridden by each subclass to define the parameters needed by each
+   * input source.
+   */
+  protected abstract AdHocTableFunction defineAdHocTableFunction();
+
+  @Override
+  public TableFunction adHocTableFn()
+  {
+    return adHocTableFn;
+  }
+
+  /**
+   * Define a table "from scratch" using SQL function arguments.
+   */
+  protected ExternalTableSpec convertArgsToTable(
+      final Map<String, Object> args,
+      final List<ColumnSpec> columns,
+      final ObjectMapper jsonMapper
+  )
+  {
+    return new ExternalTableSpec(
+        convertArgsToSource(args, jsonMapper),
+        convertArgsToFormat(args, columns, jsonMapper),
+        Columns.convertSignature(columns)
+    );
+  }
+
+  /**
+   * Convert the input source using arguments to a "from scratch" table 
function.
+   */
+  protected InputSource convertArgsToSource(Map<String, Object> args, 
ObjectMapper jsonMapper)
+  {
+    final Map<String, Object> jsonMap = new HashMap<>();
+    auditInputSource(jsonMap);
+    convertArgsToSourceMap(jsonMap, args);
+    return convertSource(jsonMap, jsonMapper);
+  }
+
+  /**
+   * Convert SQL arguments to the corresponding "generic JSON" form in the 
given map.
+   * The map will then be adjusted and converted to the actual input source.
+   */
+  protected abstract void convertArgsToSourceMap(Map<String, Object> jsonMap, 
Map<String, Object> args);
+
+  /**
+   * Convert SQL arguments, and the column schema, to an input format, if 
required.
+   */
+  protected InputFormat convertArgsToFormat(Map<String, Object> args, 
List<ColumnSpec> columns, ObjectMapper jsonMapper)
+  {
+    return null;
+  }
+
+  /**
+   * Complete a partial table using the table function arguments and columns 
provided.
+   * The arguments match the set of parameters used for the function. The 
columns are
+   * provided if the SQL included an {@code EXTENDS} clause: the 
implementation should decide
+   * if columns are required (or allowed) depending on whether the partial 
spec already
+   * defines columns.
+   *
+   * @param table   the partial table spec, with input source and format 
parsed into a
+   *                generic Java map
+   * @param args    the argument values provided in the SQL table function 
call. The arguments
+   *                use the Java types defined in the parameter definitions.
+   * @param columns the set of columns (if any) from the SQL {@code EXTEND} 
clause
+   *
+   * @return an external table spec which Calcite can consume
+   */
+  protected abstract ExternalTableSpec convertCompletedTable(

Review Comment:
   Conversion from a set of parameter values to an external table is a function 
of the input source and the specific way that we use the table function. We 
don't want to couple the table function any more tightly to the input source 
than necessary. This method call is the minimum possible coupling. It also 
leaves input source logic in the input source.



##########
server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java:
##########
@@ -19,65 +19,137 @@
 
 package org.apache.druid.catalog.model.table;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.catalog.model.ColumnDefn;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.model.CatalogUtils;
 import org.apache.druid.catalog.model.ColumnSpec;
 import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.ModelProperties;
+import org.apache.druid.catalog.model.ModelProperties.GranularityPropertyDefn;
+import org.apache.druid.catalog.model.ModelProperties.StringListPropertyDefn;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.TableDefn;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.List;
 
-/**
- * Definition of a Druid datasource. The datasource may use rollup, but rollup
- * is not represented in the catalog: it is just a way that applications store
- * data into a table.
- */
-public class DatasourceDefn extends AbstractDatasourceDefn
+public class DatasourceDefn extends TableDefn
 {
   /**
-   * Definition of a column in a datasource.
+   * Segment grain at ingestion and initial compaction. Aging rules
+   * may override the value as segments age. If not provided here,
+   * then it must be provided at ingestion time.
    */
-  public static class DatasourceColumnDefn extends ColumnDefn
-  {
-    public static final String COLUMN_TYPE = "column";
+  public static final String SEGMENT_GRANULARITY_PROPERTY = 
"segmentGranularity";
+
+  /**
+   * Catalog property value for the "all time" granularity.
+   */
+  public static final String ALL_GRANULARITY = "ALL";

Review Comment:
   It isn't? This is a "property value" not a "property".



##########
server/src/main/java/org/apache/druid/catalog/model/table/InputFormats.java:
##########
@@ -45,59 +42,85 @@
 import java.util.stream.Collectors;
 
 /**
- * Definition of the input formats which converts from property
- * lists in table specs to subclasses of {@link InputFormat}.
+ * Catalog definitions for the Druid input formats. Maps from catalog and SQL
+ * functions to the Druid input format implementations.
  */
 public class InputFormats
 {
-  public interface InputFormatDefn
-  {
-    String name();
-    String typeTag();
-    List<PropertyDefn<?>> properties();
-    void validate(ResolvedTable table);
-    InputFormat convert(ResolvedTable table);
-  }
-
+  /**
+   * Base class for input format definitions.
+   */
   public abstract static class BaseFormatDefn implements InputFormatDefn
   {
-    private final String name;
-    private final String typeTag;
-    private final List<PropertyDefn<?>> properties;
-
-    public BaseFormatDefn(
-        final String name,
-        final String typeTag,
-        final List<PropertyDefn<?>> properties
-    )
+    /**
+     * The set of SQL function parameters available when the format is
+     * specified via a SQL function. The parameters correspond to input format
+     * properties, but typically have simpler names and must require only 
simple
+     * scalar types of the kind that SQL can provide. Each subclass must 
perform
+     * conversion to the type required for Jackson conversion.
+     */
+    private final List<ParameterDefn> parameters;
+
+    public BaseFormatDefn(List<ParameterDefn> parameters)
     {
-      this.name = name;
-      this.typeTag = typeTag;
-      this.properties = properties;
+      this.parameters = parameters == null ? Collections.emptyList() : 
parameters;
     }
 
     @Override
-    public String name()
+    public List<ParameterDefn> parameters()
     {
-      return name;
+      return parameters;
     }
 
+    /**
+     * The target input format class for Jackson conversions.
+     */
+    protected abstract Class<? extends InputFormat> inputFormatClass();
+
     @Override
-    public String typeTag()
+    public void validate(ResolvedExternalTable table)
     {
-      return typeTag;
+      // Bare-bones validation: the format has to convert to the proper object.
+      // Subclasses should replace this with something fancier where needed.
+
+      if (table.inputFormatMap != null) {
+        convertFromTable(table);
+      }
     }
 
-    @Override
-    public List<PropertyDefn<?>> properties()
+    /**
+     * Convert columns from the {@link ColumnSpec} format used by the catalog 
to the
+     * list of names form requires by input formats.
+     */
+    protected void convertColumns(Map<String, Object> jsonMap, 
List<ColumnSpec> columns)
     {
-      return properties;
+      List<String> cols = columns
+          .stream()
+          .map(col -> col.name())
+          .collect(Collectors.toList());
+      jsonMap.put("columns", cols);

Review Comment:
   Sadly, the `csv` format definition includes the list of column names. Rather 
than requiring the user to provide the list twice (as in the `extern` 
function), this code "leaches off" of the column definitions used to define the 
signature so the user doesn't have to repeat themselves.



##########
server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java:
##########
@@ -19,254 +19,289 @@
 
 package org.apache.druid.catalog.model.table;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.catalog.model.CatalogUtils;
-import org.apache.druid.catalog.model.ColumnDefn;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.catalog.model.ColumnSpec;
-import org.apache.druid.catalog.model.Columns;
-import org.apache.druid.catalog.model.ModelProperties;
+import org.apache.druid.catalog.model.ModelProperties.ObjectPropertyDefn;
 import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
-import org.apache.druid.catalog.model.ParameterizedDefn;
-import org.apache.druid.catalog.model.PropertyAttributes;
 import org.apache.druid.catalog.model.ResolvedTable;
 import org.apache.druid.catalog.model.TableDefn;
-import org.apache.druid.catalog.model.table.InputFormats.InputFormatDefn;
+import org.apache.druid.catalog.model.TableDefnRegistry;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Arrays;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
- * Definition of an external input source, primarily for ingestion.
+ * Definition of an external table, primarily for ingestion.
  * The components are derived from those for Druid ingestion: an
  * input source, a format and a set of columns. Also provides
  * properties, as do all table definitions.
+ *
+ * <h4>Partial Tables and Connections</h4>
+ *
+ * An input source is a template for an external table. The input
+ * source says how to get data, and optionally the format and structure of 
that data.
+ * Since Druid never ingests the same data twice, the actual external table 
needs
+ * details that says which data to read on any specific ingestion. Thus, an 
external
+ * table is usually a "partial table": all the information that remains 
constant
+ * across ingestions, but without the information that changes. The changing
+ * information is typically the list of files (or objects or URLs) to ingest.
+ * <p>
+ * The pattern is:<br>
+ * {@code external table spec + parameters --> external table}
+ * <p>
+ * Since an input source is a parameterized (partial) external table, we can 
reuse
+ * the table metadata structures and APIs, avoiding the need to have a 
separate (but
+ * otherwise identical) structure for external tables.
+ *
+ * An external table can be thought of as a "connection", though Druid does not
+ * use that term. When used as a connection, the external table spec will omit 
the
+ * format. Instead, the format will also be provided at ingest time, along 
with the
+ * list of tables (or objects.)
+ * <p>
+ * To keep all this straight, we adopt the following terms:
+ * <dl>
+ * <dt>External table spec</dt>
+ * <dd>The JSON serialized version of an external table which can be partial or
+ * complete. The spec is a named entry in the Druid catalog</dd>
+ * <dt>Complete spec</dt>
+ * <dd>An external table spec that provides all information needed to access an
+ * external table. Each use identifies the same set of data. Useful if MSQ is 
used
+ * to query an external data source. A complete spec can be referenced as a
+ * first-class table in a {@code FROM} clause in an MSQ query.</dd>
+ * <dt>Partial spec</dt>
+ * <dd>An external table spec that omits some information. That information 
must
+ * be provided at query time in the form of a {@code TABLE} function. If the 
partial
+ * spec includes a format, then it is essentially a <i>partial table</i>. If it
+ * omits the format, then it is essentially a <i>connection</i>.</dd>
+ * <dt>Completed table</dt>
+ * <dd>The full external table that results from a partial spec and a set of 
SQL
+ * table function parameters.</dd>
+ * <dt>Ad-hoc table</dt>
+ * <dd>Users can define an external table using the generic {@code EXTERN} 
function
+ * or one of the input-source-specific functions. In this case, there is no
+ * catalog entry: all information comes from the SQL table function</dd>
+ * <dt>Partial table function</dt>
+ * <dd>The SQL table function used to "complete" a partial spec. The function
+ * defines parameters to fill in the missing information. The function is 
generated
+ * on demand and has the same name as the catalog entry for the partial spec.
+ * The function will include parameters for format if the catalog spec does not
+ * specify a format. Else, the format parameters are omitted and the completed
+ * table uses the format provided in the catalog spec.</dd>
+ * <dt>Ad-hoc table function</dt>
+ * <dd>The SQL table function used to create an ad-hoc external table. The 
function
+ * as a name defined by the {@link InputFormatDefn}, and has parameters for all
+ * support formats: the user must specify all input source and format 
properties.</dd>
+ * </dl>
+ *
+ * <h4>External Table Structure</h4>
+ *
+ * The external table is generic: it represents all valid combinations of input
+ * sources and formats. Rather than have a different table definition for 
each, we
+ * instead split out input sources and formats into their own definitions, and 
those
+ * definitions are integrated and used by this external table class. As a 
result,
+ * the {@code properties} field will contain the {@code source} property which 
has
+ * the JSON serialized form of the input source (minus items to be 
parameterized.)
+ * <p>
+ * Similarly, if the external table also defines a format (rather than 
requiring the
+ * format at ingest time), then the {@code format} property holds the 
JSON-serialized
+ * form of the input format, minus columns. The columns can be provided in the 
spec,
+ * in the {@code columns} field. The {@link InputFormatDefn} converts the 
columns to
+ * the form needed by the input format.
+ * <p>
+ * Druid's input sources all require formats. However, some sources may not 
actually
+ * need the format. A JDBC input source for example, needs no format. In other 
cases,
+ * there may be a subset of formats. Each {@link InputSourceDefn} is 
responsible for
+ * working out which formats (if any) are required. This class is agnostic 
about whether
+ * the format is supplied. (Remember that, when used as a connection, the 
external table
+ * will provide no format until ingest time.)
+ * <p>
+ * By contrast, the input source is always required.
+ *
+ * <h4>Data Formats and Conversions</h4>
+ *
+ * Much of the code here handles conversion of an external table specification 
to
+ * the form needed by SQL. Since SQL is not visible here, we instead create an
+ * instance of {@link ExternalTableSpec} which holds the input source, input 
format
+ * and row signature in the form required by SQL.
+ * <p>
+ * This class handles table specifications in three forms:
+ * <ol>
+ * <li>From a fully-defined table specification, converted to a {@code 
ExternalTableSpec}
+ * by the {@link #convert(ResolvedTable)} function.</li>
+ * <li>From a fully-defined set of arguments to a SQL table function. The
+ * {@link InputSourceDefn#adHocTableFn()} method provides the function 
definition which
+ * handles the conversion.</li>
+ * <li>From a partially-defined table specification in the catalog, augmented 
by
+ * parameters passed from a SQL function. The {@link #tableFn(ResolvedTable)} 
method
+ * creates the required function by caching the table spec. That function then 
combines
+ * the parameters to produce the required {@code ExternalTableSpec}.</li>
+ * </ol>
+ * <p>
+ * To handle these formats, and the need to adjust JSON, conversion to an
+ * {@code ExternalTableSpec} occurs in multiple steps:
+ * <ul>
+ * <li>When using a table spec, the serialized JSON is first converted to a 
generic
+ * Java map: one for the input source, another for the format.</li>
+ * <li>When using a SQL function, the SQL arguments are converted (if needed) 
and
+ * written into a Java map. If the function references an existing table spec: 
then
+ * the JSON map is first populated with the deserialized spec.</li>
+ * <li>Validation and/or adjustments are made to the Java map. Adjustments are
+ * those described elsewhere in this Javadoc.</li>
+ * <li>The column specifications from either SQL or the table spec are 
converted to
+ * a list of column names, and placed into the Java map for the input 
format.</li>
+ * <li>The maps are converted to the {@link InputSource} or {@link InputFormat}
+ * objects using a Jackson conversion.</li>
+ * </ul>
+ * The actual conversions are handled in the {@link InputFormatDefn} and
+ * {@link InputFormatDefn} classes, either directly (for a fully-defined table
+ * function) or starting here (for other use cases).
+ *
+ * <h4>Property and Parameter Names</h4>
+ *
+ * Pay careful attention to names: the names may be different in each of the 
above
+ * cases:
+ * <ul>
+ * <li>The table specification stores the input source and input format specs 
using
+ * the names defined by the classes themselves. That is, the table spec holds 
a string
+ * that represents the Jackson-serialized form of those classes. In some 
cases, the
+ * JSON can be a subset: some sources and formats have obscure checks, or 
options which
+ * are not available via this path. The code that does conversions will adjust 
the JSON
+ * prior to conversion. Each JSON object has a type field: the value of that 
type
+ * field must match that defined in the Jackson annotations for the 
corresponding
+ * class.</li>
+ * <li>SQL table functions use argument names that are typically selected for 
user
+ * convenience, and may not be the same as the JSON field name. For example, a 
field
+ * name may be a SQL reserved word, or may be overly long, or may be obscure. 
The code
+ * for each input source and input format definition does the needed 
conversion.</li>
+ * <li>Each input source and input format has a type. The input format type is 
given,
+ * in SQL by the {@code format} property. The format type name is typically 
the same
+ * as the JSON type name, but need not be.</li>
+ * </ul>
+ *
+ * <h4>Extensions</h4>
+ *
+ * This class is designed to work both with "well known" Druid input sources 
and formats,
+ * and those defined in an extension. For extension-defined sources and 
formats to work,
+ * the extension must define an {@link InputSourceDefn} or {@link 
InputFormatDefn} which
+ * are put into the {@link TableDefnRegistry} and thus available to this 
class. The result
+ * is that this class is ignorant of the actual details of sources and 
formats: it instead
+ * delegates to the input source and input format definitions for that work.
  * <p>
- * The external table implements the mechanism for parameterized tables,
- * but does not implement the {@link ParameterizedDefn} interface itself.
- * Tables which are parameterized implement that interface to expose
- * methods defined here.
+ * Input sources and input formats defined in an extension are considered 
"ephemeral":
+ * they can go away if the corresponding extension is removed from the system. 
In that
+ * case, any table functions defined by those extensions are no longer 
available, and
+ * any SQL statements that use those functions will no longer work. The 
catalog may contain
+ * an external table spec that references those definitions. Such specs will 
continue to
+ * reside in the catalog, and can be retrieved, but they will fail any query 
that attempts
+ * to reference them.
  */
-public abstract class ExternalTableDefn extends TableDefn
+public class ExternalTableDefn extends TableDefn
 {
-  public static final String EXTERNAL_COLUMN_TYPE = "extern";
-
-  public abstract static class FormattedExternalTableDefn extends 
ExternalTableDefn
-  {
-    public static final String FORMAT_PROPERTY = "format";
-
-    private final Map<String, InputFormatDefn> formats;
-
-    public FormattedExternalTableDefn(
-        final String name,
-        final String typeValue,
-        final List<PropertyDefn<?>> properties,
-        final List<ColumnDefn> columnDefns,
-        final List<InputFormatDefn> formats
-    )
-    {
-      super(
-          name,
-          typeValue,
-          addFormatProperties(properties, formats),
-          columnDefns
-      );
-      ImmutableMap.Builder<String, InputFormatDefn> builder = 
ImmutableMap.builder();
-      for (InputFormatDefn format : formats) {
-        builder.put(format.typeTag(), format);
-      }
-      this.formats = builder.build();
-    }
-
-    /**
-     * Add format properties to the base set, in the order of the formats,
-     * in the order defined by the format. Allow same-named properties across
-     * formats, as long as the types are the same.
-     */
-    private static List<PropertyDefn<?>> addFormatProperties(
-        final List<PropertyDefn<?>> properties,
-        final List<InputFormatDefn> formats
-    )
-    {
-      List<PropertyDefn<?>> toAdd = new ArrayList<>();
-      PropertyDefn<?> formatProp = new 
ModelProperties.StringPropertyDefn(FORMAT_PROPERTY, 
PropertyAttributes.SQL_FN_PARAM);
-      toAdd.add(formatProp);
-      Map<String, PropertyDefn<?>> formatProps = new HashMap<>();
-      for (InputFormatDefn format : formats) {
-        for (PropertyDefn<?> prop : format.properties()) {
-          PropertyDefn<?> existing = formatProps.putIfAbsent(prop.name(), 
prop);
-          if (existing == null) {
-            toAdd.add(prop);
-          } else if (existing.getClass() != prop.getClass()) {
-            throw new ISE(
-                "Format %s, property %s of class %s conflicts with another 
format property of class %s",
-                format.name(),
-                prop.name(),
-                prop.getClass().getSimpleName(),
-                existing.getClass().getSimpleName()
-            );
-          }
-        }
-      }
-      return CatalogUtils.concatLists(properties, toAdd);
-    }
-
-    @Override
-    protected InputFormat convertFormat(ResolvedTable table)
-    {
-      return formatDefn(table).convert(table);
-    }
+  /**
+   * Identifier for external tables.
+   */
+  public static final String TABLE_TYPE = "extern";
 
-    protected InputFormatDefn formatDefn(ResolvedTable table)
-    {
-      String formatTag = table.stringProperty(FORMAT_PROPERTY);
-      if (formatTag == null) {
-        throw new IAE("%s property must be set", FORMAT_PROPERTY);
-      }
-      InputFormatDefn formatDefn = formats.get(formatTag);
-      if (formatDefn == null) {
-        throw new IAE(
-            "Format type [%s] for property %s is not valid",
-            formatTag,
-            FORMAT_PROPERTY
-        );
-      }
-      return formatDefn;
-    }
+  /**
+   * Column type for external tables.
+   */
+  public static final String EXTERNAL_COLUMN_TYPE = "extern";
 
-    @Override
-    public void validate(ResolvedTable table)
-    {
-      super.validate(table);
-      formatDefn(table).validate(table);
-      List<ColumnSpec> columns = table.spec().columns();
-      if (columns == null || columns.isEmpty()) {
-        throw new IAE(
-            "An external table of type %s must specify one or more columns",
-            table.spec().type()
-        );
-      }
-    }
-  }
+  /**
+   * Property which holds the input source specification as serialized as JSON.
+   */
+  public static final String SOURCE_PROPERTY = "source";
 
   /**
-   * Definition of a column in a detail (non-rollup) datasource.
+   * Property which holds the optional input format specification, serialized 
as JSON.
    */
-  public static class ExternalColumnDefn extends ColumnDefn
-  {
-    public ExternalColumnDefn()
-    {
-      super(
-          "Column",
-          EXTERNAL_COLUMN_TYPE,
-          null
-      );
-    }
+  public static final String FORMAT_PROPERTY = "format";
 
-    @Override
-    public void validate(ColumnSpec spec, ObjectMapper jsonMapper)
-    {
-      super.validate(spec, jsonMapper);
-      validateScalarColumn(spec);
-    }
-  }
+  /**
+   * Definition of the input source property.
+   */
+  private static final PropertyDefn<InputSource> SOURCE_PROPERTY_DEFN =
+      new ObjectPropertyDefn<InputSource>(SOURCE_PROPERTY, InputSource.class);
 
-  protected static final ExternalColumnDefn INPUT_COLUMN_DEFN = new 
ExternalColumnDefn();
+  /**
+   * Definition of the input format property.
+   */
+  private static final PropertyDefn<InputFormat> FORMAT_PROPERTY_DEFN =
+      new ObjectPropertyDefn<InputFormat>(FORMAT_PROPERTY, InputFormat.class);
 
-  private final List<PropertyDefn<?>> fields;
+  /**
+   * Type reference used to deserialize JSON to a generic map.
+   */
+  @VisibleForTesting
+  public static final TypeReference<Map<String, Object>> MAP_TYPE_REF = new 
TypeReference<Map<String, Object>>() { };
 
-  public ExternalTableDefn(
-      final String name,
-      final String typeValue,
-      final List<PropertyDefn<?>> fields,
-      final List<ColumnDefn> columnDefns
-  )
-  {
-    super(name, typeValue, fields, columnDefns);
-    this.fields = fields;
-  }
+  private TableDefnRegistry registry;
 
-  public List<PropertyDefn<?>> parameters()
+  public ExternalTableDefn()
   {
-    return fields.stream()
-        .filter(f -> PropertyAttributes.isExternTableParameter(f))
-        .collect(Collectors.toList());
+    super(
+        "External table",
+        TABLE_TYPE,
+        Arrays.asList(
+            SOURCE_PROPERTY_DEFN,
+            FORMAT_PROPERTY_DEFN
+        ),
+        null
+    );
   }
 
-  public List<PropertyDefn<?>> tableFunctionParameters()
+  @Override
+  public void bind(TableDefnRegistry registry)
   {
-    return fields.stream()
-        .filter(f -> PropertyAttributes.isSqlFunctionParameter(f))
-        .collect(Collectors.toList());
+    this.registry = registry;
   }
 
-  /**
-   * Merge parameters provided by a SQL table function with the catalog 
information
-   * provided in the resolved table to produce a new resolved table used for a
-   * specific query.
-   */
-  public abstract ResolvedTable mergeParameters(ResolvedTable table, 
Map<String, Object> values);
-
-  public ExternalTableSpec convertToExtern(ResolvedTable table)
+  @Override
+  public void validate(ResolvedTable table)
   {
-    return new ExternalTableSpec(
-        convertSource(table),
-        convertFormat(table),
-        Columns.convertSignature(table.spec())
-    );
+    for (PropertyDefn<?> propDefn : properties().values()) {
+      if (propDefn != SOURCE_PROPERTY_DEFN && propDefn != 
FORMAT_PROPERTY_DEFN) {

Review Comment:
   No, because we are comparing instances. We want to match these exact 
instances, not some other object that happens to be `equals()` to these 
instances. Recall that most languages support two kinds of identity: instance 
identity (based on pointers) and valid identity (based, in Java, on 
`equals()`.) One generally cannot substitute one for the other.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/catalog/model/table/S3InputSourceDefn.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import org.apache.druid.catalog.model.CatalogUtils;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.table.BaseTableFunction.Parameter;
+import org.apache.druid.catalog.model.table.TableFunction.ParameterDefn;
+import org.apache.druid.catalog.model.table.TableFunction.ParameterType;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.s3.S3InputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Catalog definition for the S3 input source.
+ * <p>
+ * The catalog entry contains a serialized S3 input source, with some 
simplifying variations.
+ * The catalog entry can define the {@code bucket} table property which is the 
(single) bucket
+ * value to use when creating the list of objects: the catalog provides the 
bucket, the table
+ * function provides the list of objects. (Done this way since defining two 
correlated lists
+ * in SQL is awkward, and credentials make the most sense when working with a 
single bucket.)
+ * <p>
+ * The ad-hoc function allows the various ways to specify the objects, but not 
the configuration
+ * parameters. If the user wishes to use such parameters, they should be 
defined in a catalog
+ * entry, since providing maps in SQL is awkward.
+ * <p>
+ * The partial table function can be of various forms:
+ * <ul>
+ * <li>Fully define the table, which means providing the full set of S3 
properties and not
+ * providing the table-level {@code bucket} property. This form is complete 
and does't need
+ * a table function. If used with a table function, the function provides the 
{@code glob}
+ * parameter (if not already provided in the table spec.)</li>
+ * <li>Partially define the table: using URIs with the {@code glob} to be 
provided later, or
+ * by using the {@code bucket} table property. The table function provides the 
{@code objects}
+ * parameter to specify the specific objects. This form provides both the 
format and the list
+ * of columns.</li>
+ * <li>Partially define the table as a connection: provide only the {@code 
bucket} property,
+ * and omit both the format and the columns. The table function requests the 
{@code objects}
+ * and the {@code format}. The user must provide the list of columns.</li>
+ * </ul>
+ *
+ * @see {@link S3InputSource} for details on the meaning of the various 
properties, and the rules
+ * about valid combinations
+ */
+public class S3InputSourceDefn extends FormattedInputSourceDefn
+{
+  public static final String URIS_PARAMETER = "uris";
+  public static final String PREFIXES_PARAMETER = "prefixes";
+  public static final String BUCKET_PARAMETER = "bucket";
+  public static final String PATHS_PARAMETER = "paths";
+  public static final String ACCESS_KEY_ID_PARAMETER = "accessKeyId";
+  public static final String SECRET_ACCESS_KEY_PARAMETER = "secretAccessKey";
+  public static final String ASSUME_ROLE_ARN_PARAMETER = "assumeRoleArn";
+
+  /**
+   * The {@code objectGlob} property exists in S3, but is not documented. The 
corresponding
+   * function parameter also exists, but is not documented.
+   */
+  public static final String OBJECT_GLOB_PARAMETER = "objectGlob";
+
+  public static final String BUCKET_PROPERTY = "bucket";
+
+  private static final ParameterDefn URI_PARAM_DEFN = new 
Parameter(URIS_PARAMETER, ParameterType.VARCHAR_ARRAY, true);
+  private static final ParameterDefn PREFIXES_PARAM_DEFN = new 
Parameter(PREFIXES_PARAMETER, ParameterType.VARCHAR_ARRAY, true);
+  private static final ParameterDefn BUCKET_PARAM_DEFN = new 
Parameter(BUCKET_PARAMETER, ParameterType.VARCHAR, true);
+  private static final ParameterDefn PATHS_PARAM_DEFN = new 
Parameter(PATHS_PARAMETER, ParameterType.VARCHAR_ARRAY, true);
+  private static final ParameterDefn OBJECTS_GLOB_PARAM_DEFN = new 
Parameter(OBJECT_GLOB_PARAMETER, ParameterType.VARCHAR, true);
+  private static final List<ParameterDefn> SECURITY_PARAMS = Arrays.asList(
+      new Parameter(ACCESS_KEY_ID_PARAMETER, ParameterType.VARCHAR, true),
+      new Parameter(SECRET_ACCESS_KEY_PARAMETER, ParameterType.VARCHAR, true),
+      new Parameter(ASSUME_ROLE_ARN_PARAMETER, ParameterType.VARCHAR, true)
+  );
+
+  // Field names in the S3InputSource
+  private static final String URIS_FIELD = "uris";
+  private static final String PREFIXES_FIELD = "prefixes";
+  private static final String OBJECTS_FIELD = "objects";
+  private static final String OBJECT_GLOB_FIELD = "objectGlob";
+  private static final String PROPERTIES_FIELD = "properties";
+  private static final String ACCESS_KEY_ID_FIELD = "accessKeyId";
+  private static final String SECRET_ACCESS_KEY_FIELD = "secretAccessKey";
+  private static final String ASSUME_ROLE_ARN_FIELD = "assumeRoleArn";
+
+  @Override
+  public String typeValue()
+  {
+    return S3StorageDruidModule.SCHEME;
+  }
+
+  @Override
+  protected Class<? extends InputSource> inputSourceClass()
+  {
+    return S3InputSource.class;
+  }
+
+  @Override
+  public void validate(ResolvedExternalTable table)
+  {
+    final boolean hasFormat = table.inputFormatMap != null;
+    final boolean hasColumns = 
!CollectionUtils.isNullOrEmpty(table.resolvedTable().spec().columns());
+
+    if (hasFormat && !hasColumns) {
+      throw new IAE(
+          "An external S3 table with a format must also provide the 
corresponding columns"
+      );
+    }
+
+    // The user can either provide a bucket, or can provide one of the valid 
items.
+    final String bucket = 
table.resolvedTable().stringProperty(BUCKET_PROPERTY);
+    final boolean hasBucket = bucket != null;
+    final Map<String, Object> sourceMap = table.inputSourceMap;
+    final boolean hasUris = sourceMap.containsKey(URIS_FIELD);
+    final boolean hasPrefix = sourceMap.containsKey(PREFIXES_FIELD);
+    final boolean hasObjects = sourceMap.containsKey(OBJECTS_FIELD);
+    final boolean hasGlob = sourceMap.containsKey(OBJECT_GLOB_FIELD);
+    if (hasBucket) {
+      if (hasUris || hasPrefix || hasObjects) {
+        throw new IAE(
+            "Provide either the %s property, or one of the S3 input source 
fields %s, %s or %s, but not both.",
+            BUCKET_PROPERTY,
+            URIS_FIELD,
+            PREFIXES_FIELD,
+            OBJECTS_FIELD
+        );
+      }
+      if (hasGlob) {
+        throw new IAE(
+            "The %s property cannot be provided when the the %s property is 
set",
+            OBJECT_GLOB_FIELD,
+            BUCKET_PROPERTY
+        );
+      }
+
+      // Patch in a dummy URI so that validation of the rest of the fields
+      // will pass.
+      sourceMap.put(URIS_FIELD, Collections.singletonList(bucket));

Review Comment:
   IIRC, the S3 input source requires one of a URIs, objects or some such. 
(Druid does not support an S3 input source with nothing to read.)



##########
server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for input source definitions.
+ *
+ * @see {@link FormattedInputSourceDefn} for the base class for (most) input 
formats
+ * which take an input format.
+ */
+public abstract class BaseInputSourceDefn implements InputSourceDefn
+{
+  /**
+   * The "from-scratch" table function for this input source. The parameters
+   * are those defined by the subclass, and the apply simply turns around and
+   * asks the input source definition to do the conversion.
+   */
+  public class AdHocTableFunction extends BaseTableFunction
+  {
+    public AdHocTableFunction(List<ParameterDefn> parameters)
+    {
+      super(parameters);
+    }
+
+    @Override
+    public ExternalTableSpec apply(
+        final String fnName,
+        final Map<String, Object> args,
+        final List<ColumnSpec> columns,
+        final ObjectMapper jsonMapper
+    )
+    {
+      requireSchema(fnName, columns);
+      return convertArgsToTable(args, columns, jsonMapper);
+    }
+  }
+
+  /**
+   * The "partial" table function that starts with a catalog external table 
spec, then
+   * uses SQL function arguments to "complete" (i.e. fill in) the missing 
properties to
+   * produce a complete table which is then converted to an external table 
which Calcite
+   * can use.
+   * <p>
+   * The set of parameters depends on the input source and on whether or not 
the catalog
+   * spec provides a format.
+   */
+  public class PartialTableFunction extends BaseTableFunction
+  {
+    private final ResolvedExternalTable table;
+
+    public PartialTableFunction(final ResolvedExternalTable table, 
List<ParameterDefn> params)
+    {
+      super(params);
+      this.table = table;
+    }
+
+    @Override
+    public ExternalTableSpec apply(
+        final String fnName,
+        final Map<String, Object> args,
+        final List<ColumnSpec> columns,
+        final ObjectMapper jsonMapper
+    )
+    {
+      if 
(CollectionUtils.isNullOrEmpty(table.resolvedTable().spec().columns())) {
+        requireSchema(fnName, columns);
+      }
+      return convertCompletedTable(table, args, columns);
+    }
+  }
+
+  /**
+   * The one and only from-scratch table function for this input source. The
+   * function is defined a bind time, not construction time, since it typically
+   * needs visibility to the set of available input formats.
+   */
+  private AdHocTableFunction adHocTableFn;
+
+  /**
+   * Overridden by each subclass to return the input source class to be
+   * used for JSON conversions.
+   */
+  protected abstract Class<? extends InputSource> inputSourceClass();
+
+  @Override
+  public void bind(TableDefnRegistry registry)
+  {
+    this.adHocTableFn = defineAdHocTableFunction();
+  }
+
+  @Override
+  public void validate(ResolvedExternalTable table)
+  {
+    convertTableToSource(table);
+  }
+
+  /**
+   * Overridden by each subclass to define the parameters needed by each
+   * input source.
+   */
+  protected abstract AdHocTableFunction defineAdHocTableFunction();
+
+  @Override
+  public TableFunction adHocTableFn()
+  {
+    return adHocTableFn;
+  }
+
+  /**
+   * Define a table "from scratch" using SQL function arguments.
+   */
+  protected ExternalTableSpec convertArgsToTable(
+      final Map<String, Object> args,
+      final List<ColumnSpec> columns,
+      final ObjectMapper jsonMapper
+  )
+  {
+    return new ExternalTableSpec(
+        convertArgsToSource(args, jsonMapper),
+        convertArgsToFormat(args, columns, jsonMapper),
+        Columns.convertSignature(columns)
+    );
+  }
+
+  /**
+   * Convert the input source using arguments to a "from scratch" table 
function.
+   */
+  protected InputSource convertArgsToSource(Map<String, Object> args, 
ObjectMapper jsonMapper)

Review Comment:
   See note above. Table functions are generic. Here we're using them for input 
sources.



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -144,4 +188,125 @@ public static <T> List<T> concatLists(
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  /**
+   * Get a string parameter that can either be null or non-blank.
+   */
+  public static String getNonBlankString(Map<String, Object> args, String 
parameter)
+  {
+    String value = CatalogUtils.getString(args, parameter);
+    if (value != null) {
+      value = value.trim();
+      if (value.isEmpty()) {
+        throw new IAE("%s parameter cannot be a blank string", parameter);
+      }
+    }
+    return value;
+  }
+
+  public static List<String> getUriListArg(Map<String, Object> args, String 
parameter)
+  {
+    String urisString = CatalogUtils.getString(args, parameter);
+    if (Strings.isNullOrEmpty(urisString)) {
+      throw new IAE("One or more values are required for parameter %s", 
parameter);
+    }
+    return stringToList(urisString);
+  }
+
+  public static List<URI> stringToUriList(String uris)
+  {
+    return stringListToUriList(stringToList(uris));
+  }
+
+  /**
+   * Convert a list of strings to a list of {@link URI} objects.
+   */
+  public static List<URI> stringListToUriList(List<String> list)
+  {
+    if (list == null) {
+      return null;
+    }
+    List<URI> uris = new ArrayList<>();
+    for (String strValue : list) {
+      try {
+        uris.add(new URI(strValue));
+      }
+      catch (URISyntaxException e) {
+        throw new IAE(StringUtils.format("Argument [%s] is not a valid URI", 
strValue));
+      }
+    }
+    return uris;
+  }
+
+  /**
+   * Merge the properties for an object using a set of updates in a map. If the
+   * update value is {@code null}, then remove the property in the revised 
set. If the
+   * property is known, use the column definition to merge the values. Else, 
the
+   * update replaces any existing value.
+   * <p>
+   * This method does not validate the properties, except as needed to do a
+   * merge. A separate validation step is done on the final, merged object.
+   */
+  public static Map<String, Object> mergeProperties(
+      final Map<String, PropertyDefn<?>> properties,
+      final Map<String, Object> source,
+      final Map<String, Object> update
+  )
+  {
+    if (update == null) {
+      return source;
+    }
+    if (source == null) {
+      return update;
+    }
+    final Map<String, Object> merged = new HashMap<>(source);
+    for (Map.Entry<String, Object> entry : update.entrySet()) {
+      if (entry.getValue() == null) {
+        merged.remove(entry.getKey());
+      } else {
+        Object value = entry.getValue();
+        final PropertyDefn<?> propDefn = properties.get(entry.getKey());
+        if (propDefn != null) {
+          value = propDefn.merge(merged.get(entry.getKey()), entry.getValue());
+        }
+        merged.put(entry.getKey(), value);
+      }
+    }
+    return merged;
+  }
+
+  public static void validateGranularity(String value)
+  {
+    if (value == null) {
+      return;
+    }
+    Granularity granularity;
+    try {
+      granularity = new PeriodGranularity(new Period(value), null, null);
+    }
+    catch (IllegalArgumentException e) {
+      throw new IAE(StringUtils.format("[%s] is an invalid granularity 
string", value));
+    }
+    if (!GranularityType.isStandard(granularity)) {
+      throw new IAE(
+          "Unsupported segment graularity. "
+          + "Please use an equivalent of these granularities: %s.",
+          Arrays.stream(GranularityType.values())
+                .filter(granularityType -> 
!granularityType.equals(GranularityType.NONE))
+                .map(Enum::name)

Review Comment:
   This is an error message, copied from somewhere in MSQ. The error message 
shouldn't get copied into the spec (unless I've done something very wrong.)
   
   There was a comment in an early PR that someone didn't like two ways to 
specify granularities, and we wanted only the ISO 8601 format. So, the user 
should use that. Easy enough to fix if we want to do something else.



##########
server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for input source definitions.
+ *
+ * @see {@link FormattedInputSourceDefn} for the base class for (most) input 
formats
+ * which take an input format.
+ */
+public abstract class BaseInputSourceDefn implements InputSourceDefn
+{
+  /**
+   * The "from-scratch" table function for this input source. The parameters
+   * are those defined by the subclass, and the apply simply turns around and
+   * asks the input source definition to do the conversion.
+   */
+  public class AdHocTableFunction extends BaseTableFunction
+  {
+    public AdHocTableFunction(List<ParameterDefn> parameters)
+    {
+      super(parameters);
+    }
+
+    @Override
+    public ExternalTableSpec apply(
+        final String fnName,
+        final Map<String, Object> args,
+        final List<ColumnSpec> columns,
+        final ObjectMapper jsonMapper
+    )
+    {
+      requireSchema(fnName, columns);
+      return convertArgsToTable(args, columns, jsonMapper);
+    }
+  }
+
+  /**
+   * The "partial" table function that starts with a catalog external table 
spec, then
+   * uses SQL function arguments to "complete" (i.e. fill in) the missing 
properties to
+   * produce a complete table which is then converted to an external table 
which Calcite
+   * can use.
+   * <p>
+   * The set of parameters depends on the input source and on whether or not 
the catalog
+   * spec provides a format.
+   */
+  public class PartialTableFunction extends BaseTableFunction
+  {
+    private final ResolvedExternalTable table;
+
+    public PartialTableFunction(final ResolvedExternalTable table, 
List<ParameterDefn> params)
+    {
+      super(params);
+      this.table = table;
+    }
+
+    @Override
+    public ExternalTableSpec apply(
+        final String fnName,
+        final Map<String, Object> args,
+        final List<ColumnSpec> columns,
+        final ObjectMapper jsonMapper
+    )
+    {
+      if 
(CollectionUtils.isNullOrEmpty(table.resolvedTable().spec().columns())) {
+        requireSchema(fnName, columns);
+      }
+      return convertCompletedTable(table, args, columns);
+    }
+  }
+
+  /**
+   * The one and only from-scratch table function for this input source. The
+   * function is defined a bind time, not construction time, since it typically
+   * needs visibility to the set of available input formats.
+   */
+  private AdHocTableFunction adHocTableFn;
+
+  /**
+   * Overridden by each subclass to return the input source class to be
+   * used for JSON conversions.
+   */
+  protected abstract Class<? extends InputSource> inputSourceClass();
+
+  @Override
+  public void bind(TableDefnRegistry registry)
+  {
+    this.adHocTableFn = defineAdHocTableFunction();
+  }
+
+  @Override
+  public void validate(ResolvedExternalTable table)
+  {
+    convertTableToSource(table);
+  }
+
+  /**
+   * Overridden by each subclass to define the parameters needed by each
+   * input source.
+   */
+  protected abstract AdHocTableFunction defineAdHocTableFunction();
+
+  @Override
+  public TableFunction adHocTableFn()
+  {
+    return adHocTableFn;
+  }
+
+  /**
+   * Define a table "from scratch" using SQL function arguments.
+   */
+  protected ExternalTableSpec convertArgsToTable(

Review Comment:
   The idea is that the table function is generic. Here, the base input source 
says that, if you don't want to do anything different _for input sources_, just 
do the generic conversion. If this moved into the table function class, it 
would make the table function class only applicable to input sources, and would 
require binding the base table function class to an input source. The present 
design avoids this unwanted coupling.



##########
server/src/main/java/org/apache/druid/catalog/model/table/ResolvedExternalTable.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import org.apache.druid.catalog.model.CatalogUtils;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Internal class to hold the intermediate form of an external table: the
+ * input source and input format properties converted to Java maps, and the
+ * types of each resolved to the corresponding definitions. Used to validate
+ * a table specification, and to convert a table specification to an
+ * {@link ExternalTableSpec} when used in SQL.
+ */
+public class ResolvedExternalTable
+{
+  private final ResolvedTable table;
+  protected final Map<String, Object> inputSourceMap;
+  protected final Map<String, Object> inputFormatMap;
+  private InputSourceDefn inputSourceDefn;
+  private InputFormatDefn inputFormatDefn;
+
+  /**
+   * Construct a resolved external table by extracting the input source
+   * and input format properties, and converting each to a Java map.
+   * Validates that the input source is present: the format is optional.
+   * <p>
+   * Note: does <i>not</i> resolve the input source and input format
+   * definitions: that is done as a separate step when needed.
+   *
+   * @see {@link #resolve(TableDefnRegistry)}.
+   */
+  public ResolvedExternalTable(final ResolvedTable table)
+  {
+    this.table = table;
+    Map<String, Object> map = 
table.mapProperty(ExternalTableDefn.SOURCE_PROPERTY);
+    if (map == null || map.isEmpty()) {
+      throw new IAE("%s property is required", 
ExternalTableDefn.SOURCE_PROPERTY);
+    }
+    this.inputSourceMap = new HashMap<>(map);
+    map = table.mapProperty(ExternalTableDefn.FORMAT_PROPERTY);
+    this.inputFormatMap = map == null ? null : new HashMap<>(map);
+  }
+
+  public ResolvedTable resolvedTable()
+  {
+    return table;
+  }
+
+  /**
+   * Look up the input source type and input format type to find the 
corresponding
+   * definitions in the table registry. Throws an exception if the types are 
not
+   * defined. The input source is required, the format is optional.
+   * <p>
+   * Note, for resolution to work, the name of each definition must be the 
same as
+   * that used as the type key in the serialized JSON.
+   */
+  public ResolvedExternalTable resolve(TableDefnRegistry registry)
+  {
+    String inputSourceType = CatalogUtils.getString(inputSourceMap, 
InputSource.TYPE_PROPERTY);
+    if (inputSourceType == null) {
+      throw new IAE("Input source type %s is required", 
InputSource.TYPE_PROPERTY);
+    }
+    inputSourceDefn = registry.inputSourceDefnFor(inputSourceType);
+    if (inputSourceDefn == null) {
+      throw new IAE("Input source type %s is not registered", inputSourceType);
+    }
+    if (inputFormatMap != null) {
+      String inputFormatType = CatalogUtils.getString(inputFormatMap, 
InputFormat.TYPE_PROPERTY);
+      if (inputFormatType != null) {

Review Comment:
   This is a simple holder for information read from the catalog. It is 
perfectly legal for the catalog to define an external table that has no input 
format. Imagine a "connection" kind of thing: I specify to use S3, provide the 
bucket, provide credentials, etc. But, I expect each query to provide the 
actual file names and the format of those files. In that case, the catalog 
entry for my "connection" would contain no format: the code here will require, 
however, that I provide that format via a table function at query time.



##########
server/src/main/java/org/apache/druid/catalog/model/TableDefnRegistry.java:
##########
@@ -55,40 +61,76 @@
 {
   // Temporary list of Druid-define table definitions. This should come from
   // Guice later to allow extensions to define table types.
-  private static final TableDefn[] TABLE_DEFNS = {
+  private static final List<TableDefn> BUILTIN_TABLE_DEFNS = Arrays.asList(
       new DatasourceDefn(),
-      new InlineTableDefn(),
-      new HttpTableDefn(),
-      new LocalTableDefn()
-  };
+      new ExternalTableDefn()
+  );
+  private static final List<InputSourceDefn> BUILTIN_INPUT_SOURCE_DEFNS = 
Arrays.asList(
+      new InlineInputSourceDefn(),
+      new HttpInputSourceDefn(),
+      new LocalInputSourceDefn()
+  );
+  private static final List<InputFormatDefn> BUILTIN_INPUT_FORMAT_DEFNS = 
Arrays.asList(
+      new InputFormats.CsvFormatDefn(),
+      new InputFormats.DelimitedFormatDefn(),
+      new InputFormats.JsonFormatDefn()
+  );
 
-  private final Map<String, TableDefn> defns;
+  private final Map<String, TableDefn> tableDefns;
+  private final Map<String, InputSourceDefn> inputSourceDefns;
+  private final Map<String, InputFormatDefn> inputFormatDefns;
   private final ObjectMapper jsonMapper;
 
   public TableDefnRegistry(
-      final TableDefn[] defns,
+      @Nullable final List<TableDefn> tableDefnExtns,
+      @Nullable final List<InputSourceDefn> inputSourceDefnExtns,
+      @Nullable final List<InputFormatDefn> inputFormatDefnExtns,
       final ObjectMapper jsonMapper
   )
   {
-    ImmutableMap.Builder<String, TableDefn> builder = ImmutableMap.builder();
-    for (TableDefn defn : defns) {
-      builder.put(defn.typeValue(), defn);
-    }
-    this.defns = builder.build();
     this.jsonMapper = jsonMapper;
+    final List<TableDefn> tableDefns = 
CatalogUtils.concatLists(tableDefnExtns, BUILTIN_TABLE_DEFNS);
+    final List<InputSourceDefn> inputSourceDefns = 
CatalogUtils.concatLists(inputSourceDefnExtns, BUILTIN_INPUT_SOURCE_DEFNS);
+    final List<InputFormatDefn> inputFormatDefns = 
CatalogUtils.concatLists(inputFormatDefnExtns, BUILTIN_INPUT_FORMAT_DEFNS);
+
+    ImmutableMap.Builder<String, TableDefn> tableBuilder = 
ImmutableMap.builder();
+    for (TableDefn defn : tableDefns) {
+      tableBuilder.put(defn.typeValue(), defn);
+    }
+    this.tableDefns = tableBuilder.build();
+
+    ImmutableMap.Builder<String, InputSourceDefn> sourceBuilder = 
ImmutableMap.builder();
+    for (InputSourceDefn defn : inputSourceDefns) {
+      sourceBuilder.put(defn.typeValue(), defn);
+    }
+    this.inputSourceDefns = sourceBuilder.build();
+
+    ImmutableMap.Builder<String, InputFormatDefn> formatBuilder = 
ImmutableMap.builder();
+    for (InputFormatDefn defn : inputFormatDefns) {
+      formatBuilder.put(defn.typeValue(), defn);
+    }
+    this.inputFormatDefns = formatBuilder.build();
+
+    // Initialize all items once the entire set of bindings is defined.
+    for (InputSourceDefn defn : inputSourceDefns) {
+      defn.bind(this);
+    }
+    for (TableDefn defn : tableDefns) {
+      defn.bind(this);
+    }

Review Comment:
   We could, but, at present, input formats don't depend on anything else. 
Input source definitions depend on input sources. External tables depend on 
input sources. But formats are at the leaf: they don't depend on anything.



##########
server/src/main/java/org/apache/druid/catalog/model/table/ResolvedExternalTable.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import org.apache.druid.catalog.model.CatalogUtils;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Internal class to hold the intermediate form of an external table: the
+ * input source and input format properties converted to Java maps, and the
+ * types of each resolved to the corresponding definitions. Used to validate
+ * a table specification, and to convert a table specification to an
+ * {@link ExternalTableSpec} when used in SQL.
+ */
+public class ResolvedExternalTable
+{
+  private final ResolvedTable table;
+  protected final Map<String, Object> inputSourceMap;
+  protected final Map<String, Object> inputFormatMap;
+  private InputSourceDefn inputSourceDefn;
+  private InputFormatDefn inputFormatDefn;
+
+  /**
+   * Construct a resolved external table by extracting the input source
+   * and input format properties, and converting each to a Java map.
+   * Validates that the input source is present: the format is optional.
+   * <p>
+   * Note: does <i>not</i> resolve the input source and input format
+   * definitions: that is done as a separate step when needed.
+   *
+   * @see {@link #resolve(TableDefnRegistry)}.
+   */
+  public ResolvedExternalTable(final ResolvedTable table)
+  {
+    this.table = table;
+    Map<String, Object> map = 
table.mapProperty(ExternalTableDefn.SOURCE_PROPERTY);
+    if (map == null || map.isEmpty()) {
+      throw new IAE("%s property is required", 
ExternalTableDefn.SOURCE_PROPERTY);
+    }
+    this.inputSourceMap = new HashMap<>(map);
+    map = table.mapProperty(ExternalTableDefn.FORMAT_PROPERTY);
+    this.inputFormatMap = map == null ? null : new HashMap<>(map);
+  }
+
+  public ResolvedTable resolvedTable()
+  {
+    return table;
+  }
+
+  /**
+   * Look up the input source type and input format type to find the 
corresponding
+   * definitions in the table registry. Throws an exception if the types are 
not
+   * defined. The input source is required, the format is optional.
+   * <p>
+   * Note, for resolution to work, the name of each definition must be the 
same as
+   * that used as the type key in the serialized JSON.
+   */
+  public ResolvedExternalTable resolve(TableDefnRegistry registry)
+  {
+    String inputSourceType = CatalogUtils.getString(inputSourceMap, 
InputSource.TYPE_PROPERTY);
+    if (inputSourceType == null) {
+      throw new IAE("Input source type %s is required", 
InputSource.TYPE_PROPERTY);
+    }
+    inputSourceDefn = registry.inputSourceDefnFor(inputSourceType);
+    if (inputSourceDefn == null) {
+      throw new IAE("Input source type %s is not registered", inputSourceType);
+    }
+    if (inputFormatMap != null) {
+      String inputFormatType = CatalogUtils.getString(inputFormatMap, 
InputFormat.TYPE_PROPERTY);
+      if (inputFormatType != null) {
+        inputFormatDefn = registry.inputFormatDefnFor(inputFormatType);
+        if (inputFormatDefn == null) {
+          throw new IAE("Input format type %s is not registered", 
inputFormatType);
+        }
+      }
+    }
+    return this;
+  }
+
+  /**
+   * Validate that the table spec is correct by resolving the definitions, then
+   * converting the JSON to the desired object type. Note that this path 
requires
+   * special handling: the table spec may be partial, which means it is 
missing information
+   * needed to create a complete input source. The input source definition 
defines which
+   * values can be omitted, and defined later in SQL via function parameters. 
If those
+   * values are missing, then the input source defn should provide dummy 
values so that

Review Comment:
   The catalog can hold a partial input source. See above comment for a use 
case. We still want to validate the properties that are provided. Druid input 
sources are designed to work only if all properties are provided: they don't 
"do" partial definitions. To avoid the need to duplicate existing code, and to 
allow the catalog to support partition definitions, even if Druid doesn't, we 
just patch in dummy missing values to keep Druid happy. Ugly? Of course. But, 
easier than trying to convince Druid's input sources to change.



##########
server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.List;
+
+/**
+ * Base implementation for a table function definition.
+ *
+ * @see {@link TableFunction}
+ */
+public abstract class BaseTableFunction implements TableFunction
+{
+  public static class Parameter implements ParameterDefn
+  {
+    private final String name;
+    private final ParameterType type;
+    private final boolean optional;
+
+    public Parameter(String name, ParameterType type, boolean optional)
+    {
+      this.name = name;
+      this.type = type;
+      this.optional = optional;
+    }
+
+    @Override
+    public String name()
+    {
+      return name;
+    }
+
+    @Override
+    public ParameterType type()
+    {
+      return type;
+    }
+
+    @Override
+    public boolean isOptional()
+    {
+      return optional;
+    }
+  }
+
+  private final List<ParameterDefn> parameters;
+
+  public BaseTableFunction(List<ParameterDefn> parameters)
+  {
+    this.parameters = parameters;
+  }
+
+  @Override
+  public List<ParameterDefn> parameters()
+  {
+    return parameters;
+  }
+
+  protected void requireSchema(String fnName, List<ColumnSpec> columns)
+  {
+    if (columns == null) {

Review Comment:
   Most databases allow tables with zero columns. Druid isn't a DB: it is a 
tool for specific kinds of apps. Those apps don't allow inputs with zero 
columns. So, to avoid blowing up later, we catch the problem as early as 
possible.
   
   More realistically, someone didn't realize that they have to tell MSQ the 
input schema, and just forgot to include it.



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModule.java:
##########
@@ -64,7 +64,7 @@ public void configure(Binder binder)
   }
 
   @Provides
-  @Singleton
+  @LazySingleton

Review Comment:
   @imply-cheddar, thanks for your strident note. As it turns out, we can't 
_quite_ achieve what you ask. There is one other use of `@Singleton` in the 
code base, which I added in a previous PR. It handles the case in which Guice 
creates an instance explicitly. You indicated that you wanted only explicit 
bindings, but had to turn off that protection for some reason or other. The 
result is that, when Guice is used in tests, it will occasionally find itself 
implicitly instantiating some instances.
   
   There are times when we need those implicit instances to be singletons. 
These implicit singletons are where the `@Singleton` or `@LazySingleton` tags 
are needed. (Again, the binding is "implicit" if there is no explicit binding, 
which means we didn't use `@LazySingleton` at that time.)
   
   This is an example of a class that sometimes gets instantiated implicitly. 
So, we have to mark the class as `@[Lazy]Singleton`.
   
   There is one other place in the code where changing `@Singleton` to 
`@LazySingleton` cases Guice to crash. Why? I didn't take the time to figure it 
out. That line _didn't_ change in this PR, but is the one we should be 
discussing.
   
   To achieve @imply-cheddar 's dictate, we have to turn off implicit 
instantiation. By so doing, we eliminate the need to annotate classes.



##########
sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java:
##########
@@ -145,49 +174,41 @@ public Consistency getConsistency()
   }
 
   /**
-   * Convert the actual arguments to SQL external table function into a catalog
-   * resolved table, then convert that to an external table spec usable by MSQ.
-   *
-   * @param tableDefn catalog definition of the kind of external table
-   * @param parameters the parameters to the SQL table macro
-   * @param arguments the arguments that match the parameters. Optional 
arguments
-   *                  may be null
-   * @param schema    the external table schema provided by the EXTEND clause
-   * @param jsonMapper the JSON mapper to use for value conversions
-   * @return a spec with the three values that MSQ needs to create an external 
table
+   * Convert the list of Calcite function arguments to a map of non-null 
arguments.
+   * The resulting map must be mutable as processing may rewrite values.
    */
-  public static ExternalTableSpec convertArguments(
-      final ExternalTableDefn tableDefn,
-      final List<FunctionParameter> parameters,
-      final List<Object> arguments,
-      final SqlNodeList schema,
-      final ObjectMapper jsonMapper
+  public static Map<String, Object> convertArguments(
+      final TableFunction fn,
+      final List<Object> arguments
   )
   {
-    final TableBuilder builder = TableBuilder.of(tableDefn);
-    for (int i = 0; i < parameters.size(); i++) {
-      String name = parameters.get(i).getName();
-      Object value = arguments.get(i);
-      if (value == null) {
-        continue;
+    final List<TableFunction.ParameterDefn> params = fn.parameters();
+    final Map<String, Object> argMap = new HashMap<>();
+    for (int i = 0; i < arguments.size(); i++) {
+      final Object value = arguments.get(i);
+      if (value != null) {
+        argMap.put(params.get(i).name(), value);
       }
-      PropertyDefn<?> prop = tableDefn.property(name);
-      builder.property(name, prop.decodeSqlValue(value, jsonMapper));
     }
+    return argMap;
+  }
 
-    // Converts from a list of (identifier, type, ...) pairs to
-    // a Druid row signature. The schema itself comes from the
-    // Druid-specific EXTEND syntax added to the parser.
+  /**
+   * Converts from a list of (identifier, type, ...) pairs to
+   * list of column specs. The schema itself comes from the
+   * Druid-specific EXTEND syntax added to the parser.
+   */
+  public static List<ColumnSpec> convertColumns(SqlNodeList schema)
+  {
+    final List<ColumnSpec> columns = new ArrayList<>();
     for (int i = 0; i < schema.size(); i += 2) {
       final String name = convertName((SqlIdentifier) schema.get(i));
-      String sqlType = convertType(name, (SqlDataTypeSpec) schema.get(i + 1));
-      builder.column(name, sqlType);
+      final String sqlType = convertType(name, (SqlDataTypeSpec) schema.get(i 
+ 1));
+      columns.add(new ColumnSpec(name, sqlType, null));

Review Comment:
   We're in code that is processing the _input schema_ for an external table. 
This is not code for handling a data source table. MSQ requires that an input 
schema MUST define the column type. If the type was not given, we should have 
failed validation at some earlier point as the syntax would not have been valid.
   
   By asking about missing null column types, you may be asking about the 
behavior of columns defined for a data source, which is a topic handled 
elsewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to