TAJO-1143: TajoMaster, TajoWorker, and TajoClient should have diagnosis phase at startup
Closes #280 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a0d67bb6 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a0d67bb6 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a0d67bb6 Branch: refs/heads/master Commit: a0d67bb60ea995dc2c0ad014a8b71852e936cd56 Parents: 4bcd464 Author: Hyunsik Choi <[email protected]> Authored: Thu Dec 11 04:50:20 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Thu Dec 11 04:50:20 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/client/TajoClientImpl.java | 19 + tajo-common/pom.xml | 1 + .../java/org/apache/tajo/conf/TajoConf.java | 3 +- .../org/apache/tajo/rule/EvaluationContext.java | 55 +++ .../tajo/rule/EvaluationFailedException.java | 40 ++ .../org/apache/tajo/rule/EvaluationResult.java | 56 +++ .../org/apache/tajo/rule/SelfDiagnosisRule.java | 25 + .../tajo/rule/SelfDiagnosisRuleDefinition.java | 36 ++ .../tajo/rule/SelfDiagnosisRuleEngine.java | 154 ++++++ .../tajo/rule/SelfDiagnosisRuleProvider.java | 32 ++ .../tajo/rule/SelfDiagnosisRuleSession.java | 134 ++++++ .../tajo/rule/SelfDiagnosisRuleVisibility.java | 38 ++ .../apache/tajo/rule/base/BaseRuleProvider.java | 53 +++ .../base/CheckHadoopRuntimeVersionRule.java | 112 +++++ .../tajo/rule/base/TajoConfValidationRule.java | 90 ++++ ...g.apache.tajo.rule.SelfDiagnosisRuleProvider | 2 + .../org/apache/tajo/rule/TestRuleEngine.java | 146 ++++++ .../org/apache/tajo/rule/TestRuleSession.java | 465 +++++++++++++++++++ tajo-core/pom.xml | 1 + .../java/org/apache/tajo/master/TajoMaster.java | 15 + .../apache/tajo/master/rule/FileSystemRule.java | 110 +++++ .../tajo/master/rule/MasterRuleProvider.java | 53 +++ .../java/org/apache/tajo/worker/TajoWorker.java | 17 + .../ConnectivityCheckerRuleForTajoWorker.java | 91 ++++ .../tajo/worker/rule/WorkerRuleProvider.java | 53 +++ ...g.apache.tajo.rule.SelfDiagnosisRuleProvider | 3 + .../tajo/master/rule/TestMasterRules.java | 156 +++++++ 28 files changed, 1961 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 8add99f..6438503 100644 --- a/CHANGES +++ b/CHANGES @@ -20,6 +20,9 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1143: TajoMaster, TajoWorker, and TajoClient should have + diagnosis phase at startup. (Jihun Kang via hyunsik) + TAJO-1236: Remove slow 'new String' operation in parquet format. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index 1d637ed..dff8d65 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -19,6 +19,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -35,6 +36,10 @@ import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.jdbc.TajoResultSet; +import org.apache.tajo.rule.EvaluationContext; +import org.apache.tajo.rule.EvaluationFailedException; +import org.apache.tajo.rule.SelfDiagnosisRuleEngine; +import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.util.NetUtils; import java.io.IOException; @@ -75,12 +80,26 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que super(conf, addr, baseDatabase); this.queryClient = new QueryClientImpl(this); this.catalogClient = new CatalogAdminClientImpl(this); + + diagnoseTajoClient(); } public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException { super(hostName, port, baseDatabase); this.queryClient = new QueryClientImpl(this); this.catalogClient = new CatalogAdminClientImpl(this); + + diagnoseTajoClient(); + } + + private void diagnoseTajoClient() throws EvaluationFailedException { + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + EvaluationContext context = new EvaluationContext(); + + context.addParameter(TajoConf.class.getName(), getConf()); + + ruleSession.withRuleNames("TajoConfValidationRule").fireRules(context); } /*------------------------------------------------------------------------*/ http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-common/pom.xml b/tajo-common/pom.xml index a98060d..7f14f71 100644 --- a/tajo-common/pom.xml +++ b/tajo-common/pom.xml @@ -110,6 +110,7 @@ </executions> <configuration> <excludes> + <exclude>src/main/resources/META-INF/services/*</exclude> <exclude>src/test/resources/org/apache/tajo/conf/**</exclude> </excludes> </configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index b00b322..a38bd6c 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -144,8 +144,7 @@ public class TajoConf extends Configuration { // a username for a running Tajo cluster ROOT_DIR("tajo.rootdir", "file:///tmp/tajo-${user.name}/", Validators.groups(Validators.notNull(), Validators.pathUrl())), - USERNAME("tajo.username", "${user.name}", - Validators.groups(Validators.notNull(), Validators.javaString())), + USERNAME("tajo.username", "${user.name}", Validators.javaString()), // Configurable System Directories WAREHOUSE_DIR("tajo.warehouse.directory", EMPTY_VALUE, Validators.pathUrl()), http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationContext.java b/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationContext.java new file mode 100644 index 0000000..5a03cb4 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationContext.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +import java.util.Map; + +import org.apache.tajo.util.TUtil; + +public class EvaluationContext { + + private final Map<String, Object> paramMap = TUtil.newHashMap(); + + public void clearParameters() { + paramMap.clear(); + } + + public void addParameter(String name, Object value) { + paramMap.put(name, value); + } + + public void addParameters(Map<String, Object> params) { + paramMap.putAll(params); + } + + public Object getParameter(String name) { + return getParameter(name, null); + } + + public Object getParameter(String name, Object defaultValue) { + Object returnValue = paramMap.get(name); + + if (returnValue == null) { + returnValue = defaultValue; + } + + return returnValue; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationFailedException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationFailedException.java b/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationFailedException.java new file mode 100644 index 0000000..469d37d --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationFailedException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +public class EvaluationFailedException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public EvaluationFailedException(String message, Throwable cause) { + super(message, cause); + } + + public EvaluationFailedException(String message) { + super(message); + } + + public EvaluationFailedException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationResult.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationResult.java b/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationResult.java new file mode 100644 index 0000000..a2f4aff --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/EvaluationResult.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +public class EvaluationResult { + + private EvaluationResultCode returnCode; + private Throwable throwable; + private String message; + + public static enum EvaluationResultCode { + OK, + ERROR + } + + public EvaluationResultCode getReturnCode() { + return returnCode; + } + + public void setReturnCode(EvaluationResultCode returnCode) { + this.returnCode = returnCode; + } + + public Throwable getThrowable() { + return throwable; + } + + public void setThrowable(Throwable throwable) { + this.throwable = throwable; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRule.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRule.java b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRule.java new file mode 100644 index 0000000..bc1eae3 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRule.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +public interface SelfDiagnosisRule { + + public EvaluationResult evaluate(EvaluationContext context); + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleDefinition.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleDefinition.java b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleDefinition.java new file mode 100644 index 0000000..ae1d8ac --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleDefinition.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface SelfDiagnosisRuleDefinition { + + public String category() default "default"; + + public String name(); + + public int priority() default -1; + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleEngine.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleEngine.java b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleEngine.java new file mode 100644 index 0000000..12bd4f6 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleEngine.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; + +import org.apache.tajo.util.TUtil; + +public class SelfDiagnosisRuleEngine { + + private final Map<String, Map<String, RuleWrapper>> wrapperMap; + private static SelfDiagnosisRuleEngine instance; + + private SelfDiagnosisRuleEngine() { + wrapperMap = TUtil.newHashMap(); + loadPredefinedRules(); + } + + public static SelfDiagnosisRuleEngine getInstance() { + if (instance == null) { + synchronized (SelfDiagnosisRuleEngine.class) { + if (instance == null) { + instance = new SelfDiagnosisRuleEngine(); + } + } + } + return instance; + } + + public void reset() { + if (wrapperMap != null) { + wrapperMap.clear(); + } + loadPredefinedRules(); + } + + public SelfDiagnosisRuleSession newRuleSession() { + return new SelfDiagnosisRuleSession(this); + } + + protected Map<String, Map<String, RuleWrapper>> getRules() { + return wrapperMap; + } + + private void loadRuleData(List<SelfDiagnosisRule> ruleList) { + for (SelfDiagnosisRule rule: ruleList) { + RuleWrapper wrapper = new RuleWrapper(rule); + Map<String, RuleWrapper> categoryMap = wrapperMap.get(wrapper.getCategoryName()); + + if (categoryMap == null) { + categoryMap = TUtil.newHashMap(); + wrapperMap.put(wrapper.getCategoryName(), categoryMap); + } + + categoryMap.put(wrapper.getRuleName(), wrapper); + } + } + + protected void loadPredefinedRules() { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + ServiceLoader<SelfDiagnosisRuleProvider> serviceLoader = ServiceLoader.load(SelfDiagnosisRuleProvider.class, cl); + Iterator<SelfDiagnosisRuleProvider> iterator = serviceLoader.iterator(); + + wrapperMap.clear(); + while (iterator.hasNext()) { + SelfDiagnosisRuleProvider ruleProvider = iterator.next(); + loadRuleData(ruleProvider.getDefinedRules()); + } + } + + class RuleWrapper implements Comparable<RuleWrapper> { + private final String categoryName; + private final String ruleName; + private final int priority; + private final Class<?>[] acceptedCallers; + private final SelfDiagnosisRule rule; + + public RuleWrapper(SelfDiagnosisRule rule) { + this.rule = rule; + + SelfDiagnosisRuleDefinition ruleDefinition = rule.getClass().getAnnotation(SelfDiagnosisRuleDefinition.class); + if (ruleDefinition == null) { + throw new IllegalArgumentException(rule.getClass().getName() + " is not a valid runtime rule."); + } + categoryName = ruleDefinition.category(); + ruleName = ruleDefinition.name(); + priority = ruleDefinition.priority(); + + SelfDiagnosisRuleVisibility.LimitedPrivate limitedPrivateScope = + rule.getClass().getAnnotation(SelfDiagnosisRuleVisibility.LimitedPrivate.class); + if (limitedPrivateScope != null) { + acceptedCallers = + Arrays.copyOf(limitedPrivateScope.acceptedCallers(), + limitedPrivateScope.acceptedCallers().length); + } else { + acceptedCallers = new Class<?>[0]; + } + } + + public String getCategoryName() { + return categoryName; + } + + public String getRuleName() { + return ruleName; + } + + public Class<?>[] getAcceptedCallers() { + return acceptedCallers; + } + + public SelfDiagnosisRule getRule() { + return rule; + } + + public int getPriority() { + return priority; + } + + @Override + public int compareTo(RuleWrapper o) { + if (getPriority() == -1 && o.getPriority() == -1) { + return 0; + } else if (getPriority() == -1) { + return 1; + } else if (o.getPriority() == -1) { + return -1; + } + return (int) Math.signum(getPriority() - o.getPriority()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleProvider.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleProvider.java b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleProvider.java new file mode 100644 index 0000000..6679416 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleProvider.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +import java.util.List; + +public interface SelfDiagnosisRuleProvider { + + /** + * It will return a list of pre-defined rules. If it does not have, it will return a empty list. + * + * @return + */ + public List<SelfDiagnosisRule> getDefinedRules(); + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleSession.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleSession.java b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleSession.java new file mode 100644 index 0000000..a062974 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleSession.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.rule.SelfDiagnosisRuleEngine.RuleWrapper; +import org.apache.tajo.util.TUtil; + +public class SelfDiagnosisRuleSession { + + private final SelfDiagnosisRuleEngine ruleEngine; + private final Set<String> categoryPredicate; + private final Set<String> rulePredicate; + + protected SelfDiagnosisRuleSession(SelfDiagnosisRuleEngine engine) { + ruleEngine = engine; + categoryPredicate = TUtil.newHashSet(); + rulePredicate = TUtil.newHashSet(); + } + + public SelfDiagnosisRuleSession withCategoryNames(String...categories) { + categoryPredicate.addAll(Arrays.asList(categories)); + return this; + } + + public SelfDiagnosisRuleSession withRuleNames(String...rules) { + rulePredicate.addAll(Arrays.asList(rules)); + return this; + } + + public SelfDiagnosisRuleSession reset() { + categoryPredicate.clear(); + rulePredicate.clear(); + return this; + } + + public void fireRules(EvaluationContext context) throws EvaluationFailedException { + List<RuleWrapper> candidateRules = getCandidateRules(); + + for (RuleWrapper wrapper: candidateRules) { + EvaluationResult result = wrapper.getRule().evaluate(context); + + if (result.getReturnCode() == EvaluationResultCode.ERROR) { + throw new EvaluationFailedException(result.getMessage(), result.getThrowable()); + } + } + } + + protected List<RuleWrapper> getCandidateRules() { + Map<String, Map<String, RuleWrapper>> wrapperMap = null; + List<RuleWrapper> candidateRules = TUtil.newList(); + + wrapperMap = ruleEngine.getRules(); + Class<?> callerClazz = getCallerClassName(); + + for (String categoryName: wrapperMap.keySet()) { + if (categoryPredicate.size() > 0 && !categoryPredicate.contains(categoryName)) { + continue; + } + + Map<String, RuleWrapper> ruleMap = wrapperMap.get(categoryName); + for (String ruleName: ruleMap.keySet()) { + if (rulePredicate.size() > 0 && !rulePredicate.contains(ruleName)) { + continue; + } + + RuleWrapper ruleWrapper = ruleMap.get(ruleName); + + if (callerClazz != null && ruleWrapper.getAcceptedCallers().length > 0 + && !hasCallerClazz(callerClazz, ruleWrapper.getAcceptedCallers())) { + continue; + } + + candidateRules.add(ruleWrapper); + } + } + + Collections.sort(candidateRules); + return candidateRules; + } + + protected boolean hasCallerClazz(Class<?> callerClazz, Class<?>[] acceptedCallers) { + boolean result = false; + + String callerClazzName = callerClazz.getName(); + for (Class<?> acceptedCaller: acceptedCallers) { + if (callerClazzName.equals(acceptedCaller.getName())) { + result = true; + break; + } + } + + return result; + } + + protected Class<?> getCallerClassName() { + return new RuleSessionSecurityManager().getCallerClassName(); + } + + class RuleSessionSecurityManager extends SecurityManager { + public Class<?> getCallerClassName() { + Class<?>[] clazzArray = getClassContext(); + int clazzIdx = 2; + for (; clazzIdx < clazzArray.length; clazzIdx++) { + if (!clazzArray[clazzIdx].getName().equals(SelfDiagnosisRuleSession.class.getName())) { + break; + } + } + return clazzIdx < clazzArray.length?clazzArray[clazzIdx]:null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleVisibility.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleVisibility.java b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleVisibility.java new file mode 100644 index 0000000..dc1ee5a --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/SelfDiagnosisRuleVisibility.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +public abstract class SelfDiagnosisRuleVisibility { + + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + public @interface Public {} + + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + public @interface LimitedPrivate { + Class<?>[] acceptedCallers(); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java b/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java new file mode 100644 index 0000000..2a8eed8 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule.base; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rule.SelfDiagnosisRuleProvider; +import org.apache.tajo.rule.SelfDiagnosisRule; +import org.apache.tajo.util.ClassUtil; + +public class BaseRuleProvider implements SelfDiagnosisRuleProvider { + + private Log LOG = LogFactory.getLog(getClass()); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public List<SelfDiagnosisRule> getDefinedRules() { + Set<Class> classSet = ClassUtil.findClasses(SelfDiagnosisRule.class, + getClass().getPackage().getName()); + List<SelfDiagnosisRule> ruleList = new ArrayList<SelfDiagnosisRule>(classSet.size()); + + for (Class<SelfDiagnosisRule> ruleClazz: classSet) { + try { + ruleList.add(ruleClazz.newInstance()); + } catch (Exception e) { + LOG.warn("Cannot instantiate " + ruleClazz.getName() + " class."); + continue; + } + } + return ruleList; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java b/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java new file mode 100644 index 0000000..73feb10 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule.base; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.rule.EvaluationContext; +import org.apache.tajo.rule.EvaluationResult; +import org.apache.tajo.rule.SelfDiagnosisRuleDefinition; +import org.apache.tajo.rule.SelfDiagnosisRuleVisibility; +import org.apache.tajo.rule.SelfDiagnosisRule; +import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.validation.Validators; + +@SelfDiagnosisRuleDefinition(category="base", name = "CheckHadoopRuntimeVersionRule", priority = 0) [email protected] +public class CheckHadoopRuntimeVersionRule implements SelfDiagnosisRule { + + private Log LOG = LogFactory.getLog(getClass()); + private final Properties versionInfo; + + public CheckHadoopRuntimeVersionRule() { + InputStream is = ClassLoader.getSystemResourceAsStream("common-version-info.properties"); + versionInfo = new Properties(); + try { + versionInfo.load(is); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + } finally { + IOUtils.closeStream(is); + } + } + + private int[] getVersion() { + int[] version = new int[0]; + String versionString = versionInfo.getProperty("version"); + + if (versionString != null && !versionString.isEmpty()) { + Validators.patternMatch("\\d+\\.\\d+\\.\\d+").validate(versionString, true); + + String[] versionArray = versionString.split("\\."); + version = new int[versionArray.length]; + for (int idx = 0; idx < versionArray.length; idx++) { + version[idx] = Integer.parseInt(versionArray[idx]); + } + } + + return version; + } + + private int compareVersion(int[] left, int[] right) { + int returnValue = 0; + int minLength = Math.min(left.length, right.length); + + for (int idx = 0; idx < minLength; idx++) { + returnValue = (int) Math.signum(left[idx] - right[idx]); + if (returnValue != 0) { + break; + } + } + + if (returnValue == 0) { + returnValue = (int) Math.signum(left.length - right.length); + } + return returnValue; + } + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult evalResult = new EvaluationResult(); + + try { + int compared = compareVersion(getVersion(), new int[] {2, 3, 0}); + if (compared >= 0) { + evalResult.setReturnCode(EvaluationResultCode.OK); + evalResult.setMessage("Version test for hadoop common has passed."); + } else { + evalResult.setReturnCode(EvaluationResultCode.ERROR); + evalResult.setMessage("Checking the version of hadoop common component has failed.\n" + + "Current version : " + versionInfo.getProperty("version")); + } + } catch (Exception e) { + evalResult.setReturnCode(EvaluationResultCode.ERROR); + evalResult.setThrowable(e); + evalResult.setMessage("Checking the version of hadoop common component has failed."); + } + + return evalResult; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/java/org/apache/tajo/rule/base/TajoConfValidationRule.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/base/TajoConfValidationRule.java b/tajo-common/src/main/java/org/apache/tajo/rule/base/TajoConfValidationRule.java new file mode 100644 index 0000000..1babedc --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/rule/base/TajoConfValidationRule.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule.base; + +import java.util.Collection; +import java.util.Set; + +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.rule.EvaluationContext; +import org.apache.tajo.rule.EvaluationResult; +import org.apache.tajo.rule.SelfDiagnosisRuleVisibility; +import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.rule.SelfDiagnosisRuleDefinition; +import org.apache.tajo.rule.SelfDiagnosisRule; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.validation.ConstraintViolation; +import org.apache.tajo.validation.ConstraintViolationException; +import org.apache.tajo.validation.Validator; + +@SelfDiagnosisRuleDefinition(category="base", name="TajoConfValidationRule", priority=0) [email protected] +public class TajoConfValidationRule implements SelfDiagnosisRule { + + private Collection<ConstraintViolation> isValidationTestPassed(TajoConf.ConfVars confVar, String varValue) { + Set<ConstraintViolation> violationSet = TUtil.newHashSet(); + + if (varValue != null && confVar.valueClass() != null && confVar.validator() != null) { + Class<?> valueClazz = confVar.valueClass(); + Validator validator = confVar.validator(); + + if (Integer.class.getName().equals(valueClazz.getName())) { + int intValue = Integer.parseInt(varValue); + violationSet.addAll(validator.validate(intValue)); + } else if (Long.class.getName().equals(valueClazz.getName())) { + long longValue = Long.parseLong(varValue); + violationSet.addAll(validator.validate(longValue)); + } else if (Float.class.getName().equals(valueClazz.getName())) { + float floatValue = Float.parseFloat(varValue); + violationSet.addAll(validator.validate(floatValue)); + } else { + violationSet.addAll(validator.validate(varValue)); + } + } + + return violationSet; + } + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + Object tajoConfObj = context.getParameter(TajoConf.class.getName()); + result.setReturnCode(EvaluationResultCode.OK); + + if (tajoConfObj != null && tajoConfObj instanceof TajoConf) { + TajoConf tajoConf = (TajoConf) tajoConfObj; + + for (TajoConf.ConfVars confVar: TajoConf.ConfVars.values()) { + String varValue = tajoConf.get(confVar.keyname()); + Collection<ConstraintViolation> violationSet = isValidationTestPassed(confVar, varValue); + + if (violationSet.size() > 0) { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("Validation Test has been failed on " + confVar.keyname() + + ". Actual value is " + varValue); + result.setThrowable(new ConstraintViolationException(violationSet)); + break; + } + } + } + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/main/resources/META-INF/services/org.apache.tajo.rule.SelfDiagnosisRuleProvider ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/resources/META-INF/services/org.apache.tajo.rule.SelfDiagnosisRuleProvider b/tajo-common/src/main/resources/META-INF/services/org.apache.tajo.rule.SelfDiagnosisRuleProvider new file mode 100644 index 0000000..d070efd --- /dev/null +++ b/tajo-common/src/main/resources/META-INF/services/org.apache.tajo.rule.SelfDiagnosisRuleProvider @@ -0,0 +1,2 @@ +# Specify a path for rule provider +org.apache.tajo.rule.base.BaseRuleProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/test/java/org/apache/tajo/rule/TestRuleEngine.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/rule/TestRuleEngine.java b/tajo-common/src/test/java/org/apache/tajo/rule/TestRuleEngine.java new file mode 100644 index 0000000..a490698 --- /dev/null +++ b/tajo-common/src/test/java/org/apache/tajo/rule/TestRuleEngine.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.Map; +import java.util.jar.Attributes; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.rule.SelfDiagnosisRuleEngine.RuleWrapper; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRuleEngine { + + private static Path testPath; + + @BeforeClass + public static void setUp() throws Exception { + testPath = CommonTestingUtil.getTestDir(); + } + + @AfterClass + public static void tearDown() throws Exception { + CommonTestingUtil.cleanupTestDir(testPath.toUri().getPath()); + } + + @Test + public void testLoadPredefinedRules() throws Exception { + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + ruleEngine.loadPredefinedRules(); + + assertThat(ruleEngine.getRules().size() > 0, is(true)); + } + + public static class TestRuleProvider1 implements SelfDiagnosisRuleProvider { + + @Override + public List<SelfDiagnosisRule> getDefinedRules() { + List<SelfDiagnosisRule> ruleList = TUtil.newList(new TestRule1(), new TestRule2()); + return ruleList; + } + + } + + @SelfDiagnosisRuleDefinition(category="test",name = "TestRule1") + @SelfDiagnosisRuleVisibility.Public + public static class TestRule1 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestRule1 has passed."); + return result; + } + + } + + @SelfDiagnosisRuleDefinition(category="test",name = "TestRule2") + @SelfDiagnosisRuleVisibility.LimitedPrivate(acceptedCallers = { TestRuleEngine.class }) + public static class TestRule2 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestRule2 has passed."); + return result; + } + + } + + protected Path createJarPathForTestRuleProvider1() throws Exception { + Path jarPath = new Path(testPath, "test-jar1.jar"); + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + JarOutputStream jarOut = new JarOutputStream(new FileOutputStream(new File(jarPath.toUri())), manifest); + + JarEntry entry = new JarEntry("META-INF/services/"+SelfDiagnosisRuleProvider.class.getName()); + jarOut.putNextEntry(entry); + jarOut.write(TestRuleProvider1.class.getName().getBytes()); + jarOut.closeEntry(); + jarOut.close(); + + return jarPath; + } + + @Test + public void testRuleForTestRuleProvider() throws Exception { + ClassLoader parent = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl; + + cl = new URLClassLoader(new URL[] {createJarPathForTestRuleProvider1().toUri().toURL()}, parent); + Thread.currentThread().setContextClassLoader(cl); + + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + ruleEngine.loadPredefinedRules(); + + Map<String, Map<String, RuleWrapper>> wrapperMap = ruleEngine.getRules(); + Map<String, RuleWrapper> testMap = wrapperMap.get("test"); + assertThat(testMap, is(notNullValue())); + assertThat(testMap.size(), is(2)); + for (String ruleName: testMap.keySet()) { + assertThat(ruleName, anyOf(is("TestRule1"), is("TestRule2"))); + } + + try { + Method closeMethod = URLClassLoader.class.getMethod("close"); + closeMethod.invoke(cl); + } catch (NoSuchMethodException ignored) { + } + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-common/src/test/java/org/apache/tajo/rule/TestRuleSession.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/rule/TestRuleSession.java b/tajo-common/src/test/java/org/apache/tajo/rule/TestRuleSession.java new file mode 100644 index 0000000..5691911 --- /dev/null +++ b/tajo-common/src/test/java/org/apache/tajo/rule/TestRuleSession.java @@ -0,0 +1,465 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rule; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.jar.Attributes; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.rule.SelfDiagnosisRuleEngine.RuleWrapper; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRuleSession { + + private static Path testPath; + + @BeforeClass + public static void setUp() throws Exception { + testPath = CommonTestingUtil.getTestDir(); + } + + @AfterClass + public static void tearDown() throws Exception { + CommonTestingUtil.cleanupTestDir(testPath.toUri().getPath()); + } + + @Test + public void testGetCallerClassName() throws Exception { + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + ruleEngine.reset(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + + assertThat(ruleSession.getCallerClassName(), is(notNullValue())); + assertThat(ruleSession.getCallerClassName().getName(), is(TestRuleSession.class.getName())); + } + + public static class TestRuleSessionProvider implements SelfDiagnosisRuleProvider { + + @Override + public List<SelfDiagnosisRule> getDefinedRules() { + List<SelfDiagnosisRule> ruleList = TUtil.newList(); + ruleList.add(new TestRule1()); + ruleList.add(new TestRule2()); + ruleList.add(new TestRule3()); + return ruleList; + } + + } + + @SelfDiagnosisRuleDefinition(category="test1",name = "TestRule1") + @SelfDiagnosisRuleVisibility.Public + public static class TestRule1 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestRule1 has passed."); + return result; + } + + } + + @SelfDiagnosisRuleDefinition(category="test1",name = "TestRule2") + @SelfDiagnosisRuleVisibility.LimitedPrivate(acceptedCallers = { TestRuleSession.class }) + public static class TestRule2 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestRule2 has passed."); + return result; + } + + } + + @SelfDiagnosisRuleDefinition(category="test1",name = "TestRule3") + @SelfDiagnosisRuleVisibility.LimitedPrivate(acceptedCallers = { TestRuleEngine.class }) + public static class TestRule3 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestRule3 has passed."); + return result; + } + + } + + protected Path createServiceJar(String className) throws Exception { + Path jarPath = new Path(testPath, className+".jar"); + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + JarOutputStream jarOut = new JarOutputStream(new FileOutputStream(new File(jarPath.toUri())), manifest); + + JarEntry entry = new JarEntry("META-INF/services/"+SelfDiagnosisRuleProvider.class.getName()); + jarOut.putNextEntry(entry); + jarOut.write(className.getBytes()); + jarOut.closeEntry(); + jarOut.close(); + + return jarPath; + } + + protected Path createJarPathForTestRuleSession() throws Exception { + return createServiceJar(TestRuleSessionProvider.class.getName()); + } + + @Test + public void testGetCandidateRules() throws Exception { + ClassLoader parent = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl; + + cl = new URLClassLoader(new URL[] {createJarPathForTestRuleSession().toUri().toURL()}, parent); + Thread.currentThread().setContextClassLoader(cl); + + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + ruleEngine.reset(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + List<RuleWrapper> candidateRules = ruleSession.withCategoryNames("test1").getCandidateRules(); + + assertThat(candidateRules, is(notNullValue())); + assertThat(candidateRules.size() == 2, is(true)); + + for (RuleWrapper wrapper: candidateRules) { + assertThat(wrapper.getRuleName(), anyOf(is("TestRule1"), is("TestRule2"))); + } + + candidateRules = ruleSession.withRuleNames("TestRule1").getCandidateRules(); + + assertThat(candidateRules, is(notNullValue())); + assertThat(candidateRules.size() == 1, is(true)); + + for (RuleWrapper wrapper: candidateRules) { + assertThat(wrapper.getRuleName(), is("TestRule1")); + } + + try { + Method closeMethod = URLClassLoader.class.getMethod("close"); + closeMethod.invoke(cl); + } catch (NoSuchMethodException ignored) { + } + } + + @Test + public void testReset() throws Exception { + ClassLoader parent = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl; + + cl = new URLClassLoader(new URL[] {createJarPathForTestRuleSession().toUri().toURL()}, parent); + Thread.currentThread().setContextClassLoader(cl); + + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + ruleEngine.reset(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + List<RuleWrapper> candidateRules = ruleSession.withCategoryNames("test1") + .withRuleNames("TestRule1").getCandidateRules(); + + assertThat(candidateRules, is(notNullValue())); + assertThat(candidateRules.size() == 1, is(true)); + for (RuleWrapper wrapper: candidateRules) { + assertThat(wrapper.getRuleName(), is("TestRule1")); + } + + candidateRules = ruleSession.reset().withCategoryNames("test1").getCandidateRules(); + assertThat(candidateRules, is(notNullValue())); + assertThat(candidateRules.size() == 2, is(true)); + + for (RuleWrapper wrapper: candidateRules) { + assertThat(wrapper.getRuleName(), anyOf(is("TestRule1"), is("TestRule2"))); + } + + try { + Method closeMethod = URLClassLoader.class.getMethod("close"); + closeMethod.invoke(cl); + } catch (NoSuchMethodException ignored) { + } + } + + public static class TestRulePriorityProvider implements SelfDiagnosisRuleProvider { + + @Override + public List<SelfDiagnosisRule> getDefinedRules() { + List<SelfDiagnosisRule> ruleList = TUtil.newList(); + ruleList.add(new TestPriorityRule1()); + ruleList.add(new TestPriorityRule2()); + ruleList.add(new TestPriorityRule3()); + ruleList.add(new TestPriorityRule4()); + return ruleList; + } + + } + + @SelfDiagnosisRuleDefinition(category="test2",name = "TestPriorityRule1") + @SelfDiagnosisRuleVisibility.Public + public static class TestPriorityRule1 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestPriorityRule1 has passed."); + return result; + } + + } + + @SelfDiagnosisRuleDefinition(category="test2",name = "TestPriorityRule2") + @SelfDiagnosisRuleVisibility.Public + public static class TestPriorityRule2 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestPriorityRule2 has passed."); + return result; + } + + } + + @SelfDiagnosisRuleDefinition(category="test2",name = "TestPriorityRule3",priority=0) + @SelfDiagnosisRuleVisibility.Public + public static class TestPriorityRule3 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestPriorityRule3 has passed."); + return result; + } + + } + + @SelfDiagnosisRuleDefinition(category="test2",name = "TestPriorityRule4",priority=10) + @SelfDiagnosisRuleVisibility.Public + public static class TestPriorityRule4 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + result.setReturnCode(EvaluationResultCode.OK); + result.setMessage("TestPriorityRule4 has passed."); + return result; + } + + } + + protected Path createJarPathForRulePriority() throws Exception { + return createServiceJar(TestRulePriorityProvider.class.getName()); + } + + @Test + public void testRulePriority() throws Exception { + ClassLoader parent = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl; + + cl = new URLClassLoader(new URL[] {createJarPathForRulePriority().toUri().toURL()}, parent); + Thread.currentThread().setContextClassLoader(cl); + + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + ruleEngine.reset(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + List<RuleWrapper> candidateRules = ruleSession.withCategoryNames("test2").getCandidateRules(); + + assertThat(candidateRules, is(notNullValue())); + assertThat(candidateRules.size() == 4, is(true)); + assertThat(candidateRules.get(0).getRuleName(), is("TestPriorityRule3")); + assertThat(candidateRules.get(1).getRuleName(), is("TestPriorityRule4")); + + candidateRules = ruleSession.withRuleNames("TestPriorityRule1", "TestPriorityRule2", "TestPriorityRule4") + .getCandidateRules(); + + assertThat(candidateRules, is(notNullValue())); + assertThat(candidateRules.size() == 3, is(true)); + assertThat(candidateRules.get(0).getRuleName(), is("TestPriorityRule4")); + + try { + Method closeMethod = URLClassLoader.class.getMethod("close"); + closeMethod.invoke(cl); + } catch (NoSuchMethodException ignored) { + } + } + + public static class TestExecutionRuleProvider implements SelfDiagnosisRuleProvider { + + @Override + public List<SelfDiagnosisRule> getDefinedRules() { + List<SelfDiagnosisRule> ruleList = TUtil.newList(); + ruleList.add(new TestExecRule1()); + ruleList.add(new TestExecRule2()); + ruleList.add(new TestExecRule3()); + return ruleList; + } + + } + + @SelfDiagnosisRuleDefinition(category="test3",name="TestExecRule1",priority=0) + @SelfDiagnosisRuleVisibility.Public + public static class TestExecRule1 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + Object rule1_param1 = context.getParameter("TestExecRule1_param1"); + try { + if (rule1_param1 != null && rule1_param1 instanceof Integer) { + int rule1_param1_intVal = Integer.parseInt(rule1_param1.toString()); + if (rule1_param1_intVal == 0) { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("parameter1 is 0."); + } else { + result.setReturnCode(EvaluationResultCode.OK); + } + } else { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("parameter1 is null or not a integer type."); + } + } catch (NumberFormatException e) { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage(e.getMessage()); + result.setThrowable(e); + } + return result; + } + + } + + @SelfDiagnosisRuleDefinition(category="test3",name="TestExecRule2",priority=1) + @SelfDiagnosisRuleVisibility.Public + public static class TestExecRule2 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + Object rule2_param1 = context.getParameter("TestExecRule2_param1"); + if (rule2_param1 == null) { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("parameter1 is null."); + } else { + result.setReturnCode(EvaluationResultCode.OK); + } + return result; + } + + } + + @SelfDiagnosisRuleDefinition(category="test3",name="TestExecRule3",priority=2) + @SelfDiagnosisRuleVisibility.Public + public static class TestExecRule3 implements SelfDiagnosisRule { + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + Object rule3_param1 = context.getParameter("TestExecRule3_param1"); + + if (rule3_param1 != null && rule3_param1 instanceof String) { + String rule3_param1_string = (String) rule3_param1; + if (rule3_param1_string.startsWith("test")) { + result.setReturnCode(EvaluationResultCode.OK); + } else { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("parameter1 does not start with 'test'."); + } + } else { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("parameter1 is null or not a string type."); + } + return result; + } + + } + + protected Path createJarPathForExecutingRules() throws Exception { + return createServiceJar(TestExecutionRuleProvider.class.getName()); + } + + @Test + public void testFireRules() throws Exception { + ClassLoader parent = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl; + + cl = new URLClassLoader(new URL[] {createJarPathForExecutingRules().toUri().toURL()}, parent); + Thread.currentThread().setContextClassLoader(cl); + + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + ruleEngine.reset(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + EvaluationContext context = new EvaluationContext(); + + context.addParameter("TestExecRule1_param1", (int)5); + context.addParameter("TestExecRule2_param1", "rule2"); + context.addParameter("TestExecRule3_param1", "testResult"); + + ruleSession.withCategoryNames("test3").fireRules(context); + + try { + Method closeMethod = URLClassLoader.class.getMethod("close"); + closeMethod.invoke(cl); + } catch (NoSuchMethodException ignored) { + } + } + + @Test(expected=EvaluationFailedException.class) + public void testFireRulesWithException() throws Exception { + ClassLoader parent = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl; + + cl = new URLClassLoader(new URL[] {createJarPathForExecutingRules().toUri().toURL()}, parent); + Thread.currentThread().setContextClassLoader(cl); + + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + ruleEngine.reset(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + EvaluationContext context = new EvaluationContext(); + + context.addParameter("TestExecRule1_param1", (int)0); + context.addParameter("TestExecRule2_param1", "rule2"); + context.addParameter("TestExecRule3_param1", "testResult"); + + try { + ruleSession.withCategoryNames("test3").fireRules(context); + } finally { + try { + Method closeMethod = URLClassLoader.class.getMethod("close"); + closeMethod.invoke(cl); + } catch (NoSuchMethodException ignored) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index 04706b6..7322219 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -78,6 +78,7 @@ <exclude>src/test/resources/queries/**</exclude> <exclude>src/test/resources/results/**</exclude> <exclude>src/test/resources/results/**</exclude> + <exclude>src/main/resources/META-INF/services/*</exclude> <exclude>src/main/resources/webapps/static/js/*</exclude> </excludes> </configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 795983d..604cfe0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -49,6 +49,10 @@ import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.master.session.SessionManager; import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rule.EvaluationContext; +import org.apache.tajo.rule.EvaluationFailedException; +import org.apache.tajo.rule.SelfDiagnosisRuleEngine; +import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.util.*; import org.apache.tajo.util.history.HistoryReader; @@ -166,6 +170,7 @@ public class TajoMaster extends CompositeService { // check the system directory and create if they are not created. checkAndInitializeSystemDirectories(); + diagnoseTajoMaster(); this.storeManager = StorageManager.getStorageManager(systemConf); catalogServer = new CatalogServer(FunctionLoader.load()); @@ -278,6 +283,16 @@ public class TajoMaster extends CompositeService { LOG.info("Staging dir '" + stagingPath + "' is created"); } } + + private void diagnoseTajoMaster() throws EvaluationFailedException { + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + EvaluationContext context = new EvaluationContext(); + + context.addParameter(TajoConf.class.getName(), systemConf); + + ruleSession.withCategoryNames("base", "master").fireRules(context); + } private void startJvmPauseMonitor(){ pauseMonitor = new JvmPauseMonitor(systemConf); http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-core/src/main/java/org/apache/tajo/master/rule/FileSystemRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rule/FileSystemRule.java b/tajo-core/src/main/java/org/apache/tajo/master/rule/FileSystemRule.java new file mode 100644 index 0000000..ea97462 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rule/FileSystemRule.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rule; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.rule.EvaluationContext; +import org.apache.tajo.rule.EvaluationResult; +import org.apache.tajo.rule.SelfDiagnosisRuleDefinition; +import org.apache.tajo.rule.SelfDiagnosisRuleVisibility; +import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.rule.SelfDiagnosisRule; + +@SelfDiagnosisRuleDefinition(category="master", name="FileSystemRule") [email protected](acceptedCallers = { TajoMaster.class }) +public class FileSystemRule implements SelfDiagnosisRule { + + private void canAccessToPath(FileStatus fsStatus, FsAction action) throws Exception { + FsPermission permission = fsStatus.getPermission(); + UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); + String userName = userGroupInformation.getShortUserName(); + List<String> groupList = Arrays.asList(userGroupInformation.getGroupNames()); + + if (userName.equals(fsStatus.getOwner())) { + if (permission.getUserAction().implies(action)) { + return; + } + } else if (groupList.contains(fsStatus.getGroup())) { + if (permission.getGroupAction().implies(action)) { + return; + } + } else { + if (permission.getOtherAction().implies(action)) { + return; + } + } + throw new AccessControlException(String.format( + "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", userName, fsStatus.getPath(), + fsStatus.getOwner(), fsStatus.getGroup(), fsStatus.isDirectory() ? "d" : "-", permission)); + } + + private void checkAccessControlOnTajoPaths(TajoConf tajoConf) throws Exception { + Path tajoRootPath = TajoConf.getTajoRootDir(tajoConf); + FileSystem defaultFs = tajoRootPath.getFileSystem(tajoConf); + canAccessToPath(defaultFs.getFileStatus(tajoRootPath), FsAction.READ_WRITE); + + Path systemPath = TajoConf.getSystemDir(tajoConf); + canAccessToPath(defaultFs.getFileStatus(systemPath), FsAction.READ_WRITE); + + Path systemResourcePath = TajoConf.getSystemResourceDir(tajoConf); + canAccessToPath(defaultFs.getFileStatus(systemResourcePath), FsAction.READ_WRITE); + + Path wareHousePath = TajoConf.getWarehouseDir(tajoConf); + canAccessToPath(defaultFs.getFileStatus(wareHousePath), FsAction.READ_WRITE); + + Path stagingPath = TajoConf.getDefaultRootStagingDir(tajoConf); + canAccessToPath(defaultFs.getFileStatus(stagingPath), FsAction.READ_WRITE); + } + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + EvaluationResult result = new EvaluationResult(); + Object tajoConfObj = context.getParameter(TajoConf.class.getName()); + + if (tajoConfObj != null && tajoConfObj instanceof TajoConf) { + TajoConf tajoConf = (TajoConf) tajoConfObj; + try { + checkAccessControlOnTajoPaths(tajoConf); + + result.setReturnCode(EvaluationResultCode.OK); + } catch (Exception e) { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("Current User cannot access to this filesystem."); + result.setThrowable(e); + } + } else { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("Tajo Configuration is null or not a Configuration Type."); + } + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java new file mode 100644 index 0000000..0caedb9 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rule; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rule.SelfDiagnosisRuleProvider; +import org.apache.tajo.rule.SelfDiagnosisRule; +import org.apache.tajo.util.ClassUtil; + +public class MasterRuleProvider implements SelfDiagnosisRuleProvider { + + private Log LOG = LogFactory.getLog(getClass()); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public List<SelfDiagnosisRule> getDefinedRules() { + Set<Class> classSet = ClassUtil.findClasses(SelfDiagnosisRule.class, + getClass().getPackage().getName()); + List<SelfDiagnosisRule> ruleList = new ArrayList<SelfDiagnosisRule>(classSet.size()); + + for (Class<SelfDiagnosisRule> ruleClazz: classSet) { + try { + ruleList.add(ruleClazz.newInstance()); + } catch (Exception e) { + LOG.warn("Cannot instantiate " + ruleClazz.getName() + " class."); + continue; + } + } + return ruleList; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index c24eef1..0f0c1f9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -20,6 +20,7 @@ package org.apache.tajo.worker; import com.codahale.metrics.Gauge; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +44,10 @@ import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.rule.EvaluationContext; +import org.apache.tajo.rule.EvaluationFailedException; +import org.apache.tajo.rule.SelfDiagnosisRuleEngine; +import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.*; import org.apache.tajo.util.history.HistoryReader; @@ -262,6 +267,8 @@ public class TajoWorker extends CompositeService { taskHistoryWriter.init(conf); historyReader = new HistoryReader(workerContext.getWorkerName(), this.systemConf); + + diagnoseTajoWorker(); } private void initWorkerMetrics() { @@ -315,6 +322,16 @@ public class TajoWorker extends CompositeService { getWorkerContext().cleanupTemporalDirectories(); } } + + private void diagnoseTajoWorker() throws EvaluationFailedException { + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + EvaluationContext context = new EvaluationContext(); + + context.addParameter(TajoConf.class.getName(), systemConf); + + ruleSession.withCategoryNames("base", "worker").fireRules(context); + } private void startJvmPauseMonitor(){ pauseMonitor = new JvmPauseMonitor(systemConf); http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java new file mode 100644 index 0000000..dcd40bf --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.rule; + +import java.net.InetSocketAddress; + +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rule.EvaluationContext; +import org.apache.tajo.rule.EvaluationResult; +import org.apache.tajo.rule.SelfDiagnosisRuleDefinition; +import org.apache.tajo.rule.SelfDiagnosisRuleVisibility; +import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.rule.SelfDiagnosisRule; +import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.worker.TajoWorker; + +/** + * With this rule, Tajo worker will check the connectivity to tajo master server. + */ +@SelfDiagnosisRuleDefinition(category="worker", name="ConnectivityCheckerRuleForTajoWorker", priority=0) [email protected](acceptedCallers = { TajoWorker.class }) +public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule { + + private void checkTajoMasterConnectivity(TajoConf tajoConf) throws Exception { + RpcConnectionPool pool = RpcConnectionPool.getPool(tajoConf); + NettyClientBase masterClient = null; + InetSocketAddress masterAddress = null; + + try { + if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + masterAddress = HAServiceUtil.getMasterUmbilicalAddress(tajoConf); + } else { + masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); + } + masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true); + + masterClient.getStub(); + } finally { + if (masterClient != null) { + pool.releaseConnection(masterClient); + } + } + + } + + @Override + public EvaluationResult evaluate(EvaluationContext context) { + Object tajoConfObj = context.getParameter(TajoConf.class.getName()); + EvaluationResult result = new EvaluationResult(); + + if (tajoConfObj != null && tajoConfObj instanceof TajoConf) { + TajoConf tajoConf = (TajoConf) tajoConfObj; + try { + checkTajoMasterConnectivity(tajoConf); + + result.setReturnCode(EvaluationResultCode.OK); + } catch (Exception e) { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage(e.getMessage()); + result.setThrowable(e); + } + } else { + result.setReturnCode(EvaluationResultCode.ERROR); + result.setMessage("WorkerContext is null or not a WorkerContext type."); + } + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java new file mode 100644 index 0000000..4094efd --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.rule; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rule.SelfDiagnosisRuleProvider; +import org.apache.tajo.rule.SelfDiagnosisRule; +import org.apache.tajo.util.ClassUtil; + +public class WorkerRuleProvider implements SelfDiagnosisRuleProvider { + + private Log LOG = LogFactory.getLog(getClass()); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public List<SelfDiagnosisRule> getDefinedRules() { + Set<Class> classSet = ClassUtil.findClasses(SelfDiagnosisRule.class, + getClass().getPackage().getName()); + List<SelfDiagnosisRule> ruleList = new ArrayList<SelfDiagnosisRule>(classSet.size()); + + for (Class<SelfDiagnosisRule> ruleClazz: classSet) { + try { + ruleList.add(ruleClazz.newInstance()); + } catch (Exception e) { + LOG.warn("Cannot instantiate " + ruleClazz.getName() + " class."); + continue; + } + } + return ruleList; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a0d67bb6/tajo-core/src/main/resources/META-INF/services/org.apache.tajo.rule.SelfDiagnosisRuleProvider ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/META-INF/services/org.apache.tajo.rule.SelfDiagnosisRuleProvider b/tajo-core/src/main/resources/META-INF/services/org.apache.tajo.rule.SelfDiagnosisRuleProvider new file mode 100644 index 0000000..ab5f689 --- /dev/null +++ b/tajo-core/src/main/resources/META-INF/services/org.apache.tajo.rule.SelfDiagnosisRuleProvider @@ -0,0 +1,3 @@ +# Specify a path for rule provider +org.apache.tajo.master.rule.MasterRuleProvider +org.apache.tajo.worker.rule.WorkerRuleProvider \ No newline at end of file
