This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e280ffc8db37697104809c9fed8b0ed2a850372c Author: JingsongLi <[email protected]> AuthorDate: Tue Feb 11 14:28:02 2020 +0800 [FLINK-15912][table-planner] Support create table source/sink by context in legacy planner --- .../flink/table/factories/TableFactoryUtil.java | 12 ---- .../flink/table/catalog/CatalogCalciteSchema.java | 11 +++- .../table/catalog/CatalogManagerCalciteSchema.java | 8 ++- .../flink/table/catalog/DatabaseCalciteSchema.java | 27 +++++--- .../flink/table/api/internal/TableEnvImpl.scala | 15 ++--- .../apache/flink/table/planner/StreamPlanner.scala | 16 ++--- .../table/catalog/DatabaseCalciteSchemaTest.java | 4 +- .../table/sqlexec/SqlToOperationConverterTest.java | 2 +- .../table/api/stream/sql/TableFactoryTest.scala | 74 ++++++++++++++++++++++ .../table/utils/TestContextTableFactory.scala | 69 ++++++++++++++++++++ 10 files changed, 191 insertions(+), 47 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index e33d55c..bea892e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -21,7 +21,6 @@ package org.apache.flink.table.factories; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; @@ -106,17 +105,6 @@ public class TableFactoryUtil { /** * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog. */ - public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, CatalogTable catalogTable, ObjectPath tablePath) { - TableFactory tableFactory = catalog.getTableFactory().orElse(null); - if (tableFactory instanceof TableSinkFactory) { - return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(tablePath, catalogTable)); - } - return Optional.empty(); - } - - /** - * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog. - */ public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) { TableFactory tableFactory = catalog.getTableFactory().orElse(null); if (tableFactory instanceof TableSinkFactory) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java index 5be5a79..f5dc057 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableConfig; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.rel.type.RelProtoDataType; @@ -44,11 +45,17 @@ public class CatalogCalciteSchema implements Schema { private final boolean isStreamingMode; private final String catalogName; private final CatalogManager catalogManager; + private final TableConfig tableConfig; - public CatalogCalciteSchema(boolean isStreamingMode, String catalogName, CatalogManager catalogManager) { + public CatalogCalciteSchema( + boolean isStreamingMode, + String catalogName, + CatalogManager catalogManager, + TableConfig tableConfig) { this.isStreamingMode = isStreamingMode; this.catalogName = catalogName; this.catalogManager = catalogManager; + this.tableConfig = tableConfig; } /** @@ -60,7 +67,7 @@ public class CatalogCalciteSchema implements Schema { @Override public Schema getSubSchema(String schemaName) { if (catalogManager.schemaExists(catalogName, schemaName)) { - return new DatabaseCalciteSchema(isStreamingMode, schemaName, catalogName, catalogManager); + return new DatabaseCalciteSchema(isStreamingMode, schemaName, catalogName, catalogManager, tableConfig); } else { return null; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java index e7376fc..1ec9783 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableConfig; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.rel.type.RelProtoDataType; @@ -43,10 +44,13 @@ import java.util.Set; public class CatalogManagerCalciteSchema implements Schema { private final CatalogManager catalogManager; + private final TableConfig tableConfig; private boolean isStreamingMode; - public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isStreamingMode) { + public CatalogManagerCalciteSchema( + CatalogManager catalogManager, TableConfig tableConfig, boolean isStreamingMode) { this.catalogManager = catalogManager; + this.tableConfig = tableConfig; this.isStreamingMode = isStreamingMode; } @@ -83,7 +87,7 @@ public class CatalogManagerCalciteSchema implements Schema { @Override public Schema getSubSchema(String name) { if (catalogManager.schemaExists(name)) { - return new CatalogCalciteSchema(isStreamingMode, name, catalogManager); + return new CatalogCalciteSchema(isStreamingMode, name, catalogManager, tableConfig); } else { return null; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java index 1dd5b23..6226dff 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java @@ -18,12 +18,14 @@ package org.apache.flink.table.catalog; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.calcite.FlinkTypeFactory; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.table.factories.TableFactoryUtil; import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.factories.TableSourceFactoryContextImpl; import org.apache.flink.table.plan.schema.TableSinkTable; import org.apache.flink.table.plan.schema.TableSourceTable; import org.apache.flink.table.plan.stats.FlinkStatistic; @@ -57,16 +59,19 @@ class DatabaseCalciteSchema implements Schema { private final String catalogName; private final String databaseName; private final CatalogManager catalogManager; + private final TableConfig tableConfig; public DatabaseCalciteSchema( boolean isStreamingMode, String databaseName, String catalogName, - CatalogManager catalogManager) { + CatalogManager catalogManager, + TableConfig tableConfig) { this.isStreamingMode = isStreamingMode; this.databaseName = databaseName; this.catalogName = catalogName; this.catalogManager = catalogManager; + this.tableConfig = tableConfig; } @Override @@ -83,20 +88,20 @@ class DatabaseCalciteSchema implements Schema { .flatMap(Catalog::getTableFactory) .orElse(null); } - return convertTable(identifier.toObjectPath(), table, tableFactory); + return convertTable(identifier, table, tableFactory); }) .orElse(null); } - private Table convertTable(ObjectPath tablePath, CatalogBaseTable table, @Nullable TableFactory tableFactory) { + private Table convertTable(ObjectIdentifier identifier, CatalogBaseTable table, @Nullable TableFactory tableFactory) { if (table instanceof QueryOperationCatalogView) { return QueryOperationCatalogViewTable.createCalciteTable(((QueryOperationCatalogView) table)); } else if (table instanceof ConnectorCatalogTable) { return convertConnectorTable((ConnectorCatalogTable<?, ?>) table); } else if (table instanceof CatalogTable) { - return convertCatalogTable(tablePath, (CatalogTable) table, tableFactory); + return convertCatalogTable(identifier, (CatalogTable) table, tableFactory); } else if (table instanceof CatalogView) { - return convertCatalogView(tablePath, (CatalogView) table); + return convertCatalogView(identifier.getObjectName(), (CatalogView) table); } else { throw new TableException("Unsupported table type: " + table); } @@ -125,17 +130,19 @@ class DatabaseCalciteSchema implements Schema { } } - private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table, @Nullable TableFactory tableFactory) { + private Table convertCatalogTable(ObjectIdentifier identifier, CatalogTable table, @Nullable TableFactory tableFactory) { final TableSource<?> tableSource; + final TableSourceFactory.Context context = new TableSourceFactoryContextImpl( + identifier, table, tableConfig.getConfiguration()); if (tableFactory != null) { if (tableFactory instanceof TableSourceFactory) { - tableSource = ((TableSourceFactory) tableFactory).createTableSource(tablePath, table); + tableSource = ((TableSourceFactory) tableFactory).createTableSource(context); } else { throw new TableException( "Cannot query a sink-only table. TableFactory provided by catalog must implement TableSourceFactory"); } } else { - tableSource = TableFactoryUtil.findAndCreateTableSource(table); + tableSource = TableFactoryUtil.findAndCreateTableSource(context); } if (!(tableSource instanceof StreamTableSource)) { @@ -153,14 +160,14 @@ class DatabaseCalciteSchema implements Schema { ); } - private Table convertCatalogView(ObjectPath tableName, CatalogView table) { + private Table convertCatalogView(String tableName, CatalogView table) { TableSchema schema = table.getSchema(); return new ViewTable( null, typeFactory -> ((FlinkTypeFactory) typeFactory).buildLogicalRowType(schema), table.getExpandedQuery(), Arrays.asList(catalogName, databaseName), - Arrays.asList(catalogName, databaseName, tableName.getObjectName()) + Arrays.asList(catalogName, databaseName, tableName) ); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 3faee81..9fd0278 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.catalog.exceptions.{TableNotExistException => _, _ import org.apache.flink.table.delegation.Parser import org.apache.flink.table.expressions._ import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup -import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory} +import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl} import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _} import org.apache.flink.table.module.{Module, ModuleManager} import org.apache.flink.table.operations.ddl._ @@ -102,7 +102,7 @@ abstract class TableEnvImpl( new PlanningConfigurationBuilder( config, functionCatalog, - asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), + asRootSchema(new CatalogManagerCalciteSchema(catalogManager, config, isStreamingMode)), expressionBridge) private val parser: Parser = new ParserImpl( @@ -741,18 +741,15 @@ abstract class TableEnvImpl( val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName) val catalogTable = s.asInstanceOf[CatalogTable] + val context = new TableSinkFactoryContextImpl( + objectIdentifier, catalogTable, config.getConfiguration) if (catalog.isPresent && catalog.get().getTableFactory.isPresent) { - val sink = TableFactoryUtil.createTableSinkForCatalogTable( - catalog.get(), - catalogTable, - objectIdentifier.toObjectPath) + val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context) if (sink.isPresent) { return Option(sink.get()) } } - val sinkProperties = catalogTable.toProperties - Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties) - .createTableSink(sinkProperties)) + Option(TableFactoryUtil.findAndCreateTableSink(context)) case _ => None } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index 6592f1b..46823f2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -30,7 +30,7 @@ import org.apache.flink.table.delegation.{Executor, Parser, Planner} import org.apache.flink.table.executor.StreamExecutor import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.{ExpressionBridge, PlannerExpression, PlannerExpressionConverter, PlannerTypeInferenceUtilImpl} -import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory} +import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl} import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations._ import org.apache.flink.table.plan.StreamOptimizer @@ -51,7 +51,6 @@ import _root_.java.util import _root_.java.util.Objects import _root_.java.util.function.{Supplier => JSupplier} -import _root_.scala.collection.JavaConversions._ import _root_.scala.collection.JavaConverters._ /** @@ -77,7 +76,7 @@ class StreamPlanner( functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE) private val internalSchema: CalciteSchema = - asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true)) + asRootSchema(new CatalogManagerCalciteSchema(catalogManager, config, true)) // temporary bridge between API and planner private val expressionBridge: ExpressionBridge[PlannerExpression] = @@ -434,18 +433,15 @@ class StreamPlanner( case Some(s) if s.isInstanceOf[CatalogTable] => val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName) val catalogTable = s.asInstanceOf[CatalogTable] + val context = new TableSinkFactoryContextImpl( + objectIdentifier, catalogTable, config.getConfiguration) if (catalog.isPresent && catalog.get().getTableFactory.isPresent) { - val sink = TableFactoryUtil.createTableSinkForCatalogTable( - catalog.get(), - catalogTable, - objectIdentifier.toObjectPath) + val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context) if (sink.isPresent) { return Option(sink.get()) } } - val sinkProperties = catalogTable.toProperties - Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties) - .createTableSink(sinkProperties)) + Option(TableFactoryUtil.findAndCreateTableSink(context)) case _ => None } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java index 69e1438..a850f48 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.catalog; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.TestExternalTableSourceFactory.TestExternalTableSource; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -55,7 +56,8 @@ public class DatabaseCalciteSchemaTest { DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(true, databaseName, catalogName, - catalogManager); + catalogManager, + new TableConfig()); catalog.createTable(new ObjectPath(databaseName, tableName), new TestCatalogBaseTable(), false); Table table = calciteSchema.getTable(tableName); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index 8bd077b..036cc6a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -95,7 +95,7 @@ public class SqlToOperationConverterTest { private final PlanningConfigurationBuilder planningConfigurationBuilder = new PlanningConfigurationBuilder(tableConfig, functionCatalog, - asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)), + asRootSchema(new CatalogManagerCalciteSchema(catalogManager, tableConfig, false)), new ExpressionBridge<>(functionCatalog, PlannerExpressionConverter.INSTANCE())); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala new file mode 100644 index 0000000..9bdbfd1 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectIdentifier} +import org.apache.flink.table.factories.TableFactory +import org.apache.flink.table.utils.{TableTestBase, TestContextTableFactory} + +import org.junit.{Assert, Test} + +import java.util.Optional + +class TableFactoryTest extends TableTestBase { + + private val util = streamTestUtil() + + @Test + def testTableSourceSinkFactory(): Unit = { + val factory = new TestContextTableFactory( + ObjectIdentifier.of("cat", "default", "t1"), + ObjectIdentifier.of("cat", "default", "t2")) + util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY, true) + util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") { + override def getTableFactory: Optional[TableFactory] = Optional.of(factory) + }) + util.tableEnv.useCatalog("cat") + + val sourceDDL = + """ + |create table t1( + | a int, + | b varchar, + | c int + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + val sinkDDL = + """ + |create table t2( + | a int, + | b int + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + val query = + """ + |insert into t2 + |select t1.a, t1.c from t1 + """.stripMargin + util.tableEnv.sqlUpdate(sourceDDL) + util.tableEnv.sqlUpdate(sinkDDL) + util.tableEnv.sqlUpdate(query) + Assert.assertTrue(factory.hasInvokedSource) + Assert.assertTrue(factory.hasInvokedSink) + } +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala new file mode 100644 index 0000000..611076f --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.configuration.{ConfigOption, ConfigOptions} +import org.apache.flink.table.catalog.ObjectIdentifier +import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactory, TableSourceFactory} +import org.apache.flink.table.utils.TestContextTableFactory.REQUIRED_KEY +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource + +import org.junit.Assert + +import java.{lang, util} + +/** + * Test [[TableSourceFactory]] and [[TableSinkFactory]] for context. + */ +class TestContextTableFactory[T]( + sourceIdentifier: ObjectIdentifier, + sinkIdentifier: ObjectIdentifier) + extends TableSourceFactory[T] with TableSinkFactory[T] { + + var hasInvokedSource = false + var hasInvokedSink = false + + override def requiredContext(): util.Map[String, String] = { + throw new UnsupportedOperationException + } + + override def supportedProperties(): util.List[String] = { + throw new UnsupportedOperationException + } + + override def createTableSource(context: TableSourceFactory.Context): TableSource[T] = { + Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY)) + Assert.assertEquals(sourceIdentifier, context.getObjectIdentifier) + hasInvokedSource = true + TableFactoryUtil.findAndCreateTableSource(context) + } + + override def createTableSink(context: TableSinkFactory.Context): TableSink[T] = { + Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY)) + Assert.assertEquals(sinkIdentifier, context.getObjectIdentifier) + hasInvokedSink = true + TableFactoryUtil.findAndCreateTableSink(context) + } +} + +object TestContextTableFactory{ + val REQUIRED_KEY: ConfigOption[lang.Boolean] = ConfigOptions + .key("testing.required.key").booleanType().defaultValue(false) +}
