[
https://issues.apache.org/jira/browse/KYLIN-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595033#comment-16595033
]
ASF GitHub Bot commented on KYLIN-3456:
---------------------------------------
shaofengshi closed pull request #214: KYLIN-3456, KYLIN-3456, KYLIN-3458
URL: https://github.com/apache/kylin/pull/214
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 6c177a0627..23dcaf37eb 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -1108,7 +1108,7 @@ public SnapshotTable buildSnapshotTable(CubeSegment
cubeSeg, String lookupTable)
TableDesc tableDesc = new
TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
IReadableTable hiveTable =
SourceManager.createReadableTable(tableDesc);
- SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable,
tableDesc);
+ SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable,
tableDesc, cubeSeg.getConfig());
CubeDesc cubeDesc = cubeSeg.getCubeDesc();
if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
diff --git
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index dd90b3303d..d1895ba9b8 100644
---
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -115,7 +115,7 @@ public void removeSnapshot(String resourcePath) throws
IOException {
snapshotCache.invalidate(resourcePath);
}
- public SnapshotTable buildSnapshot(IReadableTable table, TableDesc
tableDesc) throws IOException {
+ public SnapshotTable buildSnapshot(IReadableTable table, TableDesc
tableDesc, KylinConfig cubeConfig) throws IOException {
SnapshotTable snapshot = new SnapshotTable(table,
tableDesc.getIdentity());
snapshot.updateRandomUuid();
@@ -125,8 +125,8 @@ public SnapshotTable buildSnapshot(IReadableTable table,
TableDesc tableDesc) th
return getSnapshotTable(dup);
}
- if (snapshot.getSignature().getSize() / 1024 / 1024 >
config.getTableSnapshotMaxMB()) {
- throw new IllegalStateException("Table snapshot should be no
greater than " + config.getTableSnapshotMaxMB() //
+ if ((float) snapshot.getSignature().getSize() / 1024 / 1024 >
cubeConfig.getTableSnapshotMaxMB()) {
+ throw new IllegalStateException("Table snapshot should be no
greater than " + cubeConfig.getTableSnapshotMaxMB() //
+ " MB, but " + tableDesc + " size is " +
snapshot.getSignature().getSize());
}
diff --git
a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/LookupTableTest.java
b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/LookupTableTest.java
index f53e8fe726..5251c1cac8 100644
---
a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/LookupTableTest.java
+++
b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/LookupTableTest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.dict.lookup;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -30,6 +31,7 @@
import org.apache.kylin.dict.TrieDictionaryForest;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -115,6 +117,47 @@ public void testGetClassName(){
}
+ @Test
+ public void testSnapshotOverVolume() throws IOException {
+ KylinConfig cubeConfig =
KylinConfig.createKylinConfig(getTestConfig());
+ cubeConfig.setProperty("kylin.snapshot.max-mb", "0");
+ String tableName = "DEFAULT.TEST_KYLIN_FACT";
+ TableDesc tableDesc =
TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName,
"default");
+ IReadableTable mockTable = new IReadableTable() {
+ @Override
+ public TableReader getReader() throws IOException {
+ return new TableReader() {
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
+ @Override
+ public String[] getRow() {
+ return new String[0];
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ };
+ }
+ @Override
+ public TableSignature getSignature() throws IOException {
+ return new IReadableTable.TableSignature("", 2 * 1024 * 1024,
+ System.currentTimeMillis());
+ }
+ @Override
+ public boolean exists() throws IOException {
+ return false;
+ }
+ };
+ SnapshotManager snapshotManager = getSnapshotManager();
+ try {
+ snapshotManager.buildSnapshot(mockTable, tableDesc,
cubeConfig).getResourcePath();
+ } catch (IllegalStateException ex) {
+ Assert.assertTrue(ex.getLocalizedMessage().startsWith("Table
snapshot should be no greater than 0 MB"));
+ }
+ }
+
private String millis(String dateStr) {
return String.valueOf(DateFormat.stringToMillis(dateStr));
}
diff --git
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 7fc9eafe16..dbbfc3962c 100644
---
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -151,25 +151,31 @@ public final ExecuteResult execute(ExecutableContext
executableContext) throws E
try {
onExecuteStart(executableContext);
- Throwable exception;
+ Throwable catchedException;
+ Throwable realException;
do {
if (retry > 0) {
logger.info("Retry " + retry);
}
- exception = null;
+ catchedException = null;
result = null;
try {
result = doWork(executableContext);
} catch (Throwable e) {
logger.error("error running Executable: " +
this.toString());
- exception = e;
+ catchedException = e;
}
retry++;
- } while (needRetry(this.retry, exception)); //exception in
ExecuteResult should handle by user itself.
+ realException = catchedException != null ? catchedException
+ : (result.getThrowable() != null ?
result.getThrowable() : null);
- if (exception != null) {
- onExecuteError(exception, executableContext);
- throw new ExecuteException(exception);
+ //don't invoke retry on ChainedExecutable
+ } while (needRetry(this.retry, realException)); //exception in
ExecuteResult should handle by user itself.
+
+ //check exception in result to avoid retry on
ChainedExecutable(only need to retry on subtask actually)
+ if (realException != null) {
+ onExecuteError(realException, executableContext);
+ throw new ExecuteException(realException);
}
onExecuteFinishedWithRetry(result, executableContext);
@@ -468,8 +474,9 @@ protected final boolean isPaused() {
// Retry will happen in below cases:
// 1) if property "kylin.job.retry-exception-classes" is not set or is
null, all jobs with exceptions will retry according to the retry times.
// 2) if property "kylin.job.retry-exception-classes" is set and is not
null, only jobs with the specified exceptions will retry according to the retry
times.
- public static boolean needRetry(int retry, Throwable t) {
- if (retry > KylinConfig.getInstanceFromEnv().getJobRetry() || t ==
null) {
+ public boolean needRetry(int retry, Throwable t) {
+ if (retry > KylinConfig.getInstanceFromEnv().getJobRetry() || t == null
+ || (this instanceof DefaultChainedExecutable)) {
return false;
} else {
return isRetryableException(t.getClass().getName());
diff --git
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index 3c1a7ea927..9d4c5753d8 100644
---
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -37,7 +37,6 @@
import org.apache.kylin.job.RunningTestExecutable;
import org.apache.kylin.job.SelfStopExecutable;
import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
@@ -238,13 +237,21 @@ public void testSchedulerRestart() throws Exception {
@Test
public void testRetryableException() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task = new ErrorTestExecutable();
+ job.addTask(task);
+
System.setProperty("kylin.job.retry", "3");
- Assert.assertTrue(AbstractExecutable.needRetry(1, new Exception("")));
- Assert.assertFalse(AbstractExecutable.needRetry(1, null));
- Assert.assertFalse(AbstractExecutable.needRetry(4, new Exception("")));
+
+ //don't retry on DefaultChainedExecutable, only retry on subtasks
+ Assert.assertFalse(job.needRetry(1, new Exception("")));
+ Assert.assertTrue(task.needRetry(1, new Exception("")));
+ Assert.assertFalse(task.needRetry(1, null));
+ Assert.assertFalse(task.needRetry(4, new Exception("")));
System.setProperty("kylin.job.retry-exception-classes",
"java.io.FileNotFoundException");
- Assert.assertTrue(AbstractExecutable.needRetry(1, new
FileNotFoundException()));
- Assert.assertFalse(AbstractExecutable.needRetry(1, new Exception("")));
+
+ Assert.assertTrue(task.needRetry(1, new FileNotFoundException()));
+ Assert.assertFalse(task.needRetry(1, new Exception("")));
}
}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
index 783ded065b..c64694c284 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
@@ -63,7 +63,7 @@ protected ExecuteResult doWork(ExecutableContext context)
throws ExecuteExceptio
TableDesc tableDesc = metaMgr.getTableDesc(lookupTableName,
cube.getProject());
IReadableTable hiveTable =
SourceManager.createReadableTable(tableDesc);
logger.info("take snapshot for table:" + lookupTableName);
- SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable,
tableDesc);
+ SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable,
tableDesc, cube.getConfig());
logger.info("update snapshot path to cube metadata");
if (cubeDesc.isGlobalSnapshotTable(lookupTableName)) {
diff --git
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
index 031da29c3e..872f570d76 100644
---
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -57,7 +57,7 @@ public void basicTest() throws Exception {
String tableName = "EDW.TEST_SITES";
TableDesc tableDesc =
TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName,
"default");
IReadableTable hiveTable =
SourceManager.createReadableTable(tableDesc);
- String snapshotPath = snapshotMgr.buildSnapshot(hiveTable,
tableDesc).getResourcePath();
+ String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc,
getTestConfig()).getResourcePath();
snapshotMgr.wipeoutCache();
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 64c14bc2b2..0cccca6c23 100644
---
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -131,7 +131,7 @@ public StreamingRequest saveStreamingConfig(@RequestBody
StreamingRequest stream
boolean saveStreamingSuccess = false, saveKafkaSuccess = false;
try {
- tableService.addStreamingTable(tableDesc, project);
+ tableService.loadTableToProject(tableDesc, null, project);
} catch (IOException e) {
throw new BadRequestException("Failed to add streaming table.");
}
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index d8d802df5d..874800920e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -23,11 +23,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -70,7 +69,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -78,7 +76,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
@Component("tableService")
public class TableService extends BasicService {
@@ -123,13 +120,26 @@ public TableDesc getTableDescByName(String tableName,
boolean withExt, String pr
return table;
}
- public String[] loadHiveTablesToProject(String[] tables, String project)
throws Exception {
+ /**
+ * @return all loaded table names
+ * @throws Exception on error
+ */
+ public String[] loadHiveTablesToProject(String[] hiveTables, String
project) throws Exception {
aclEvaluate.checkProjectAdminPermission(project);
- List<Pair<TableDesc, TableExtDesc>> allMeta = getAllMeta(tables,
project);
- return loadHiveTablesToProject(project, allMeta);
+ List<Pair<TableDesc, TableExtDesc>> allMeta =
extractHiveTableMeta(hiveTables, project);
+ return loadTablesToProject(allMeta, project);
}
- String[] loadHiveTablesToProject(String project, List<Pair<TableDesc,
TableExtDesc>> allMeta) throws Exception {
+ /**
+ * @return all loaded table names
+ * @throws Exception on error
+ */
+ public String[] loadTableToProject(TableDesc tableDesc, TableExtDesc
extDesc, String project) throws IOException {
+ return loadTablesToProject(Lists.newArrayList(Pair.newPair(tableDesc,
extDesc)), project);
+ }
+
+ private String[] loadTablesToProject(List<Pair<TableDesc, TableExtDesc>>
allMeta, String project) throws IOException {
+ aclEvaluate.checkProjectAdminPermission(project);
// do schema check
TableMetadataManager metaMgr = getTableManager();
CubeManager cubeMgr = getCubeManager();
@@ -156,16 +166,18 @@ public TableDesc getTableDescByName(String tableName,
boolean withExt, String pr
}
metaMgr.saveSourceTable(tableDesc, project);
- TableExtDesc origExt =
metaMgr.getTableExt(tableDesc.getIdentity(), project);
- if (origExt == null || origExt.getProject() == null) {
- extDesc.setUuid(RandomUtil.randomUUID().toString());
- extDesc.setLastModified(0);
- } else {
- extDesc.setUuid(origExt.getUuid());
- extDesc.setLastModified(origExt.getLastModified());
+ if (extDesc != null) {
+ TableExtDesc origExt =
metaMgr.getTableExt(tableDesc.getIdentity(), project);
+ if (origExt == null || origExt.getProject() == null) {
+ extDesc.setUuid(UUID.randomUUID().toString());
+ extDesc.setLastModified(0);
+ } else {
+ extDesc.setUuid(origExt.getUuid());
+ extDesc.setLastModified(origExt.getLastModified());
+ }
+ extDesc.init(project);
+ metaMgr.saveTableExt(extDesc, project);
}
- extDesc.init(project);
- metaMgr.saveTableExt(extDesc, project);
saved.add(tableDesc.getIdentity());
}
@@ -175,8 +187,7 @@ public TableDesc getTableDescByName(String tableName,
boolean withExt, String pr
return result;
}
- private List<Pair<TableDesc, TableExtDesc>> getAllMeta(String[] tables,
String project) throws Exception {
- // de-dup
+ public List<Pair<TableDesc, TableExtDesc>> extractHiveTableMeta(String[]
tables, String project) throws Exception { // de-dup
SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
for (String fullTableName : tables) {
String[] parts = HadoopUtil.parseHiveTableName(fullTableName);
@@ -201,49 +212,6 @@ public TableDesc getTableDescByName(String tableName,
boolean withExt, String pr
return allMeta;
}
- public Map<String, String[]> loadHiveTables(String[] tableNames, String
project, boolean isNeedProfile)
- throws Exception {
- aclEvaluate.checkProjectAdminPermission(project);
- String submitter =
SecurityContextHolder.getContext().getAuthentication().getName();
- Map<String, String[]> result = new HashMap<String, String[]>();
-
- String[] loaded = loadHiveTablesToProject(tableNames, project);
- result.put("result.loaded", loaded);
- Set<String> allTables = new HashSet<String>();
- for (String tableName : tableNames) {
- allTables.add(normalizeHiveTableName(tableName));
- }
- for (String loadedTableName : loaded) {
- allTables.remove(loadedTableName);
- }
- String[] unloaded = new String[allTables.size()];
- allTables.toArray(unloaded);
- result.put("result.unloaded", unloaded);
- if (isNeedProfile) {
- calculateCardinalityIfNotPresent(loaded, submitter, project);
- }
- return result;
- }
-
- public Map<String, String[]> unloadHiveTables(String[] tableNames, String
project) throws IOException {
- aclEvaluate.checkProjectAdminPermission(project);
- Set<String> unLoadSuccess = Sets.newHashSet();
- Set<String> unLoadFail = Sets.newHashSet();
- Map<String, String[]> result = new HashMap<String, String[]>();
-
- for (String tableName : tableNames) {
- if (unloadHiveTable(tableName, project)) {
- unLoadSuccess.add(tableName);
- } else {
- unLoadFail.add(tableName);
- }
- }
-
- result.put("result.unload.success", (String[])
unLoadSuccess.toArray(new String[unLoadSuccess.size()]));
- result.put("result.unload.fail", (String[]) unLoadFail.toArray(new
String[unLoadFail.size()]));
- return result;
- }
-
private void addTableToProject(String[] tables, String project) throws
IOException {
getProjectManager().addTableDescToProject(tables, project);
}
@@ -297,19 +265,6 @@ public boolean unloadHiveTable(String tableName, String
project) throws IOExcept
return rtn;
}
- /**
- *
- * @param desc
- * @param project
- * @throws IOException
- */
- public void addStreamingTable(TableDesc desc, String project) throws
IOException {
- aclEvaluate.checkProjectAdminPermission(project);
- desc.setUuid(RandomUtil.randomUUID().toString());
- getTableManager().saveSourceTable(desc, project);
- addTableToProject(new String[] { desc.getIdentity() }, project);
- }
-
/**
*
* @param project
diff --git
a/server/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
b/server/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
index 29ab5fd4a1..6d98d60c8a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
@@ -18,9 +18,7 @@
package org.apache.kylin.rest.service;
-import com.google.common.collect.Lists;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
@@ -39,7 +37,7 @@ public void testLoadHiveTablesToProject() throws Exception {
TableMetadataManager tableMgr =
TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
TableDesc tableDesc = tableMgr.getTableDesc("TEST_KYLIN_FACT",
"default");
TableExtDesc tableExt = tableMgr.getTableExt(tableDesc);
- String[] defaults = tableService.loadHiveTablesToProject("default",
Lists.newArrayList(Pair.newPair(tableDesc, tableExt)));
+ String[] defaults = tableService.loadTableToProject(tableDesc,
tableExt, "default");
Assert.assertEquals("DEFAULT.TEST_KYLIN_FACT", defaults[0]);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Cube level's snapshot config does not work
> ------------------------------------------
>
> Key: KYLIN-3456
> URL: https://issues.apache.org/jira/browse/KYLIN-3456
> Project: Kylin
> Issue Type: Bug
> Reporter: Xinbei Fu
> Priority: Major
>
> Configuration kylin.snapshot.max-mb does not work when set in cubes.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)