chl-wxp commented on code in PR #6029:
URL: https://github.com/apache/seatunnel/pull/6029#discussion_r1445554216
##########
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java:
##########
@@ -208,13 +211,70 @@ public CatalogTable getTable(TablePath tablePath)
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
- throw new UnsupportedOperationException();
+ this.createTable(
+ StarRocksSaveModeUtil.fillingCreateSql(
+ template,
+ table.getTableId().getDatabaseName(),
+ table.getTableId().getTableName(),
+ table.getTableSchema()));
}
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ if (ignoreIfNotExists) {
+ conn.createStatement().execute("DROP TABLE IF EXISTS " +
tablePath.getFullName());
+ } else {
+ conn.createStatement()
+ .execute(String.format("DROP TABLE %s",
tablePath.getFullName()));
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ if (ignoreIfNotExists) {
+ conn.createStatement()
+ .execute(String.format("TRUNCATE TABLE %s",
tablePath.getFullName()));
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed TRUNCATE TABLE in catalog %s",
tablePath.getFullName()),
+ e);
+ }
+ }
+
+ public void executeSql(TablePath tablePath, String sql) {
+ try (Connection connection = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ try (PreparedStatement ps = connection.prepareStatement(sql)) {
+ // Will there exist concurrent drop for one table?
+ ps.execute();
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Failed executeSql
error %s", sql), e);
+ }
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed EXECUTE SQL in
catalog %s", sql), e);
+ }
+ }
+
+ public boolean isExistsData(TablePath tablePath) {
+ try (Connection connection = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ String sql = String.format("select * from %s limit 1",
tablePath.getFullName());
+ PreparedStatement ps = connection.prepareStatement(sql);
Review Comment:
uptaded
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]