shaofengshi closed pull request #191: KYLIN-3485 Make unloading table more
flexible
URL: https://github.com/apache/kylin/pull/191
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 2c5a9226ba..f79d0f0a7d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -19,6 +19,7 @@
package org.apache.kylin.source;
import java.io.Closeable;
+import java.io.IOException;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
@@ -55,4 +56,9 @@
* For testing purpose.
*/
ISampleDataDeployer getSampleDataDeployer();
+
+ /**
+ * Unload table.
+ */
+ void unloadTable(String tableName, String project) throws IOException;
}
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 786daa6a17..3c661f2aca 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -32,6 +32,7 @@
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeManager;
@@ -53,7 +54,6 @@
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.msg.Message;
import org.apache.kylin.rest.msg.MsgPicker;
@@ -62,11 +62,11 @@
import org.apache.kylin.rest.response.TableSnapshotResponse;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -277,8 +277,6 @@ public boolean unloadHiveTable(String tableName, String
project) throws IOExcept
return false;
}
- tableType = desc.getSourceType();
-
if (!modelService.isTableInModel(desc, project)) {
removeTableFromProject(tableName, project);
rtn = true;
@@ -293,20 +291,9 @@ public boolean unloadHiveTable(String tableName, String
project) throws IOExcept
metaMgr.removeSourceTable(tableName, project);
// remove streaming info
- if (tableType == 1) {
- StreamingConfig config = null;
- KafkaConfig kafkaConfig = null;
- try {
- config =
streamingService.getStreamingManager().getStreamingConfig(tableName);
- kafkaConfig = kafkaConfigService.getKafkaConfig(tableName,
project);
- streamingService.dropStreamingConfig(config, project);
- kafkaConfigService.dropKafkaConfig(kafkaConfig, project);
- rtn = true;
- } catch (Exception e) {
- rtn = false;
- logger.error(e.getLocalizedMessage(), e);
- }
- }
+ SourceManager sourceManager =
SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
+ ISource source = sourceManager.getCachedSource(desc);
+ source.unloadTable(tableName, project);
return rtn;
}
diff --git
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index daf93d344b..938114c2d6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -82,6 +82,11 @@ public ISampleDataDeployer getSampleDataDeployer() {
return new HiveMetadataExplorer();
}
+ @Override
+ public void unloadTable(String tableName, String project) throws
IOException {
+
+ }
+
@Override
public void close() throws IOException {
// not needed
diff --git
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index ae3bbc5108..37d119eefe 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -67,6 +67,11 @@ public ISampleDataDeployer getSampleDataDeployer() {
return new JdbcExplorer();
}
+ @Override
+ public void unloadTable(String tableName, String project) throws
IOException {
+
+ }
+
@Override
public void close() throws IOException {
// not needed
diff --git
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 70d37aadd1..264f2ce8a6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -37,6 +37,7 @@
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.ISampleDataDeployer;
import org.apache.kylin.source.ISource;
@@ -246,6 +247,19 @@ public ISampleDataDeployer getSampleDataDeployer() {
throw new UnsupportedOperationException();
}
+ @Override
+ public void unloadTable(String tableName, String project) throws
IOException {
+ StreamingConfig config;
+ KafkaConfig kafkaConfig;
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ StreamingManager streamingManager =
StreamingManager.getInstance(kylinConfig);
+ KafkaConfigManager kafkaConfigManager =
KafkaConfigManager.getInstance(kylinConfig);
+ config = streamingManager.getStreamingConfig(tableName);
+ kafkaConfig = kafkaConfigManager.getKafkaConfig(tableName);
+ streamingManager.removeStreamingConfig(config);
+ kafkaConfigManager.removeKafkaConfig(kafkaConfig);
+ }
+
@Override
public void close() throws IOException {
// not needed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services