FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283903488
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##########
@@ -46,42 +49,56 @@ public class FileCatalogStore extends AbstractCatalogStore {
private static final Logger LOG =
LoggerFactory.getLogger(FileCatalogStore.class);
- private static final String FILE_EXTENSION = ".yaml";
+ static final String FILE_EXTENSION = ".yaml";
- /** The directory path where catalog configurations will be stored. */
- private final String catalogStoreDirectory;
-
- /** The character set to use when reading and writing catalog files. */
- private final String charset;
+ /** The YAML mapper to use when reading and writing catalog files. */
+ private static final YAMLMapper YAML_MAPPER =
+ new
YAMLMapper().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER);
- /** The YAML parser to use when reading and writing catalog files. */
- private final Yaml yaml = new Yaml();
+ /** The directory path where catalog configurations will be stored. */
+ private final Path catalogStorePath;
/**
* Creates a new {@link FileCatalogStore} instance with the specified
directory path.
*
- * @param catalogStoreDirectory the directory path where catalog
configurations will be stored
+ * @param catalogStorePath the directory path where catalog configurations
will be stored
*/
- public FileCatalogStore(String catalogStoreDirectory, String charset) {
- this.catalogStoreDirectory = catalogStoreDirectory;
- this.charset = charset;
+ public FileCatalogStore(String catalogStorePath) {
+ this.catalogStorePath = new Path(catalogStorePath);
}
/**
* Opens the catalog store and initializes the catalog file map.
*
- * @throws CatalogException if the catalog store directory does not exist
or if there is an
- * error reading the directory
+ * @throws CatalogException if the catalog store directory does not exist,
not a directory, or
+ * if there is an error reading the directory
*/
@Override
public void open() throws CatalogException {
- super.open();
-
try {
+ FileSystem fs = catalogStorePath.getFileSystem();
+ if (!fs.exists(catalogStorePath)) {
+ throw new CatalogException(
+ String.format(
+ "Failed to open catalog store. The catalog
store directory %s does not exist.",
+ catalogStorePath));
+ }
- } catch (Throwable e) {
- throw new CatalogException("Failed to open file catalog store
directory", e);
+ if (!fs.getFileStatus(catalogStorePath).isDir()) {
+ throw new CatalogException(
+ String.format(
+ "Failed to open catalog store. The given
catalog store path %s is not a directory.",
+ catalogStorePath));
+ }
+ } catch (CatalogException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to open file catalog store directory %s.",
catalogStorePath),
+ e);
}
+ super.open();
Review Comment:
Generally, `super.open()` is called first. Why is it called last here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]