Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5564#discussion_r170232160
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
---
@@ -246,13 +394,93 @@ class DescriptorProperties(normalizeKeys: Boolean =
true) {
Some(schemaBuilder.build())
}
+ /**
+ * Returns a table schema under the given key if it exists.
+ */
+ def getOptionalTableSchema(key: String): Optional[TableSchema] =
toJava(getTableSchema(key))
+
+ /**
+ * Returns the type information under the given key if it exists.
+ */
+ def getType(key: String): Option[TypeInformation[_]] = {
+ properties.get(key).map(TypeStringUtils.readTypeInfo)
+ }
+
+ /**
+ * Returns the type information under the given key if it exists.
+ * This method is intended for Java code.
+ */
+ def getOptionalType(key: String): Optional[TypeInformation[_]] = {
+ toJava(getType(key))
+ }
+
+ /**
+ * Returns a prefix subset of properties.
+ */
+ def getPrefix(prefixKey: String): Map[String, String] = {
+ val prefix = prefixKey + '.'
+ properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) =>
+ k.substring(prefix.length) -> v // remove prefix
+ }.toMap
+ }
+
+ /**
+ * Returns a prefix subset of properties.
+ * This method is intended for Java code.
+ */
+ def getPrefixMap(prefixKey: String): JMap[String, String] =
getPrefix(prefixKey).asJava
--- End diff --
I find the different names for methods that do the same confusing. I'd just
remove the Scala methods.
---