http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/RunningTaskClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/RunningTaskClr2Java.cpp b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/RunningTaskClr2Java.cpp new file mode 100644 index 0000000..8ca1e65 --- /dev/null +++ b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/RunningTaskClr2Java.cpp @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "Clr2JavaImpl.h" + +namespace Org { + namespace Apache { + namespace Reef { + namespace Driver { + namespace Bridge { + ref class ManagedLog { + internal: + static BridgeLogger^ LOGGER = BridgeLogger::GetLogger("<C++>"); + }; + RunningTaskClr2Java::RunningTaskClr2Java(JNIEnv *env, jobject jobjectRunningTask) { + ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::RunningTaskClr2Java"); + + pin_ptr<JavaVM*> pJavaVm = &_jvm; + if (env->GetJavaVM(pJavaVm) != 0) { + ManagedLog::LOGGER->LogError("Failed to get JavaVM", nullptr); + } + _jobjectRunningTask = reinterpret_cast<jobject>(env->NewGlobalRef(jobjectRunningTask)); + + jclass jclassRunningTask = env->GetObjectClass (_jobjectRunningTask); + jmethodID jmidGetId = env->GetMethodID(jclassRunningTask, "getId", "()Ljava/lang/String;"); + + _jstringId = reinterpret_cast<jstring>(env->NewGlobalRef(env -> CallObjectMethod(_jobjectRunningTask, jmidGetId))); + ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::RunningTaskClr2Java"); + } + + IActiveContextClr2Java^ RunningTaskClr2Java::GetActiveContext() { + ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::GetActiveContext"); + + JNIEnv *env = RetrieveEnv(_jvm); + + jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask); + jfieldID jidActiveContext = env->GetFieldID(jclassRunningTask, "jactiveContext", "Lorg/apache/reef/javabridge/ActiveContextBridge;"); + jobject jobjectActiveContext = env->GetObjectField(_jobjectRunningTask, jidActiveContext); + ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::GetActiveContext"); + + return gcnew ActiveContextClr2Java(env, jobjectActiveContext); + } + + String^ RunningTaskClr2Java::GetId() { + ManagedLog::LOGGER->Log("RunningTaskClr2Java::GetId"); + JNIEnv *env = RetrieveEnv(_jvm); + return ManagedStringFromJavaString(env, _jstringId); + } + + void RunningTaskClr2Java::Send(array<byte>^ message) { + ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::Send"); + JNIEnv *env = RetrieveEnv(_jvm); + jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask); + jmethodID jmidSend = env->GetMethodID(jclassRunningTask, "send", "([B)V"); + + + if (jmidSend == NULL) { + ManagedLog::LOGGER->Log("jmidSend is NULL"); + return; + } + env->CallObjectMethod( + _jobjectRunningTask, + jmidSend, + JavaByteArrayFromManagedByteArray(env, message)); + ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Send"); + } + + void RunningTaskClr2Java::OnError(String^ message) { + ManagedLog::LOGGER->Log("RunningTaskClr2Java::OnError"); + JNIEnv *env = RetrieveEnv(_jvm); + HandleClr2JavaError(env, message, _jobjectRunningTask); + } + } + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/SuspendedTaskClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/SuspendedTaskClr2Java.cpp b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/SuspendedTaskClr2Java.cpp new file mode 100644 index 0000000..a10f88e --- /dev/null +++ b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/SuspendedTaskClr2Java.cpp @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "Clr2JavaImpl.h" + +namespace Org { + namespace Apache { + namespace Reef { + namespace Driver { + namespace Bridge { + ref class ManagedLog { + internal: + static BridgeLogger^ LOGGER = BridgeLogger::GetLogger("<C++>"); + }; + + SuspendedTaskClr2Java::SuspendedTaskClr2Java(JNIEnv *env, jobject jobjectSuspendedTask) { + ManagedLog::LOGGER->LogStart("SuspendedTaskClr2Java::SuspendedTaskClr2Java"); + pin_ptr<JavaVM*> pJavaVm = &_jvm; + if (env->GetJavaVM(pJavaVm) != 0) { + ManagedLog::LOGGER->LogError("Failed to get JavaVM", nullptr); + } + _jobjectSuspendedTask = reinterpret_cast<jobject>(env->NewGlobalRef(jobjectSuspendedTask)); + + jclass jclassSuspendedTask = env->GetObjectClass (_jobjectSuspendedTask); + jfieldID jidTaskId = env->GetFieldID(jclassSuspendedTask, "taskId", "Ljava/lang/String;"); + _jstringId = reinterpret_cast<jstring>(env->NewGlobalRef(env->GetObjectField(_jobjectSuspendedTask, jidTaskId))); + ManagedLog::LOGGER->LogStop("SuspendedTaskClr2Java::SuspendedTaskClr2Java"); + } + + IActiveContextClr2Java^ SuspendedTaskClr2Java::GetActiveContext() { + ManagedLog::LOGGER->LogStart("SuspendedTaskClr2Java::GetActiveContext"); + JNIEnv *env = RetrieveEnv(_jvm); + + jclass jclassSuspendedTask = env->GetObjectClass (_jobjectSuspendedTask); + jfieldID jidActiveContext = env->GetFieldID(jclassSuspendedTask, "jactiveContext", "Lorg/apache/reef/javabridge/ActiveContextBridge;"); + jobject jobjectActiveContext = env->GetObjectField(_jobjectSuspendedTask, jidActiveContext); + ManagedLog::LOGGER->LogStop("SuspendedTaskClr2Java::GetActiveContext"); + return gcnew ActiveContextClr2Java(env, jobjectActiveContext); + } + + String^ SuspendedTaskClr2Java::GetId() { + ManagedLog::LOGGER->Log("SuspendedTaskClr2Java::GetId"); + JNIEnv *env = RetrieveEnv(_jvm); + return ManagedStringFromJavaString(env, _jstringId); + } + + array<byte>^ SuspendedTaskClr2Java::Get() { + ManagedLog::LOGGER->Log("SuspendedTaskClr2Java::Get"); + JNIEnv *env = RetrieveEnv(_jvm); + jclass jclassSuspendedTask = env->GetObjectClass (_jobjectSuspendedTask); + jmethodID jmidGet = env->GetMethodID(jclassSuspendedTask, "get", "()[B"); + + if (jmidGet == NULL) { + ManagedLog::LOGGER->Log("jmidGet is NULL"); + return nullptr; + } + jbyteArray jMessage = (jbyteArray) env->CallObjectMethod(_jobjectSuspendedTask, jmidGet); + return ManagedByteArrayFromJavaByteArray(env, jMessage); + } + + void SuspendedTaskClr2Java::OnError(String^ message) { + ManagedLog::LOGGER->Log("SuspendedTaskClr2Java::OnError"); + JNIEnv *env = RetrieveEnv(_jvm); + HandleClr2JavaError(env, message, _jobjectSuspendedTask); + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/TaskMessageClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/TaskMessageClr2Java.cpp b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/TaskMessageClr2Java.cpp new file mode 100644 index 0000000..d2f8286 --- /dev/null +++ b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/TaskMessageClr2Java.cpp @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "Clr2JavaImpl.h" + +namespace Org { + namespace Apache { + namespace Reef { + namespace Driver { + namespace Bridge { + ref class ManagedLog { + internal: + static BridgeLogger^ LOGGER = BridgeLogger::GetLogger("<C++>"); + }; + + TaskMessageClr2Java::TaskMessageClr2Java(JNIEnv *env, jobject jtaskMessage) { + ManagedLog::LOGGER->LogStart("TaskMessageClr2Java::TaskMessageClr2Java"); + pin_ptr<JavaVM*> pJavaVm = &_jvm; + if (env->GetJavaVM(pJavaVm) != 0) { + ManagedLog::LOGGER->LogError("Failed to get JavaVM", nullptr); + } + _jobjectTaskMessage = reinterpret_cast<jobject>(env->NewGlobalRef(jtaskMessage)); + + jclass jclassTaskMessage = env->GetObjectClass (_jobjectTaskMessage); + jfieldID jidTaskId = env->GetFieldID(jclassTaskMessage, "taskId", "Ljava/lang/String;"); + _jstringId = reinterpret_cast<jstring>(env->NewGlobalRef(env->GetObjectField(_jobjectTaskMessage, jidTaskId))); + ManagedLog::LOGGER->LogStop("TaskMessageClr2Java::TaskMessageClr2Java"); + } + + void TaskMessageClr2Java::OnError(String^ message) { + ManagedLog::LOGGER->Log("TaskMessageClr2Java::OnError"); + JNIEnv *env = RetrieveEnv(_jvm); + HandleClr2JavaError(env, message, _jobjectTaskMessage); + } + + String^ TaskMessageClr2Java::GetId() { + ManagedLog::LOGGER->Log("TaskMessageClr2Java::GetId"); + JNIEnv *env = RetrieveEnv(_jvm); + return ManagedStringFromJavaString(env, _jstringId); + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar ---------------------------------------------------------------------- diff --git a/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar b/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar index e43d8bf..9f25ae3 100644 Binary files a/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar and b/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar differ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/pom.xml b/lang/java/reef-bridge-java/pom.xml new file mode 100644 index 0000000..e93274d --- /dev/null +++ b/lang/java/reef-bridge-java/pom.xml @@ -0,0 +1,116 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>reef-bridge-java</artifactId> + <name>REEF Bridge Java</name> + <description>Bridge between JVM and CLR.</description> + + <parent> + <groupId>org.apache.reef</groupId> + <artifactId>reef-project</artifactId> + <version>0.11.0-incubating-SNAPSHOT</version> + <relativePath>../../..</relativePath> + </parent> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-local</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-yarn</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-io</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-checkpoint</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-webserver</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifest> + <addClasspath>false</addClasspath> + <classpathPrefix>lib/</classpathPrefix> + <mainClass>org.apache.reef.javabridge.JavaBridge</mainClass> + </manifest> + </archive> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>process-classes</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <exportAntProperties>true</exportAntProperties> + <target> + <property name="runtime_classpath" refid="maven.compile.classpath"/> + <exec executable="javah"> + <arg value="-cp"/> + <arg value="${runtime_classpath}"/> + <arg value="-d"/> + <arg value="${project.build.directory}/classes"/> + <arg value="org.apache.reef.javabridge.NativeInterop"/> + </exec> + </target> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java new file mode 100644 index 0000000..a0dedf5 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.io.naming.Identifiable; +import org.apache.reef.tang.ClassHierarchy; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.formats.AvroConfigurationSerializer; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ActiveContextBridge extends NativeBridge implements Identifiable { + private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName()); + + private ActiveContext jactiveContext; + + private AvroConfigurationSerializer serializer; + + private String contextId; + + private String evaluatorId; + + public ActiveContextBridge(ActiveContext activeContext) { + jactiveContext = activeContext; + serializer = new AvroConfigurationSerializer(); + contextId = activeContext.getId(); + evaluatorId = activeContext.getEvaluatorId(); + } + + public void submitTaskString(final String taskConfigurationString) { + + if (taskConfigurationString.isEmpty()) { + throw new RuntimeException("empty taskConfigurationString provided."); + } + ClassHierarchy clrClassHierarchy = Utilities.loadClassHierarchy(NativeInterop.CLASS_HIERARCHY_FILENAME); + Configuration taskConfiguration; + try { + taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy); + } catch (final Exception e) { + final String message = "Unable to de-serialize CLR task configurations using class hierarchy."; + LOG.log(Level.SEVERE, message, e); + throw new RuntimeException(message, e); + } + jactiveContext.submitTask(taskConfiguration); + } + + public String getEvaluatorDescriptorSring() { + final String descriptorString = Utilities.getEvaluatorDescriptorString(jactiveContext.getEvaluatorDescriptor()); + LOG.log(Level.FINE, "active context - serialized evaluator descriptor: " + descriptorString); + return descriptorString; + } + + @Override + public void close() { + jactiveContext.close(); + } + + @Override + public String getId() { + return contextId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java new file mode 100644 index 0000000..5d88355 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.tang.ClassHierarchy; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.formats.AvroConfigurationSerializer; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class AllocatedEvaluatorBridge extends NativeBridge { + + private static final Logger LOG = Logger.getLogger(AllocatedEvaluatorBridge.class.getName()); + + private final AllocatedEvaluator jallocatedEvaluator; + private final AvroConfigurationSerializer serializer; + private final ClassHierarchy clrClassHierarchy; + private final String evaluatorId; + private final String nameServerInfo; + + public AllocatedEvaluatorBridge(final AllocatedEvaluator allocatedEvaluator, final String serverInfo) { + jallocatedEvaluator = allocatedEvaluator; + serializer = new AvroConfigurationSerializer(); + clrClassHierarchy = Utilities.loadClassHierarchy(NativeInterop.CLASS_HIERARCHY_FILENAME); + evaluatorId = allocatedEvaluator.getId(); + nameServerInfo = serverInfo; + } + + public void submitContextAndTaskString(final String contextConfigurationString, final String taskConfigurationString) { + if (contextConfigurationString.isEmpty()) { + throw new RuntimeException("empty contextConfigurationString provided."); + } + if (taskConfigurationString.isEmpty()) { + throw new RuntimeException("empty taskConfigurationString provided."); + } + Configuration contextConfiguration; + Configuration taskConfiguration; + try { + contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy); + taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy); + } catch (final Exception e) { + final String message = "Unable to de-serialize CLR context or task configurations using class hierarchy."; + LOG.log(Level.SEVERE, message, e); + throw new RuntimeException(message, e); + } + jallocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration); + } + + public void submitContextString(final String contextConfigurationString) { + if (contextConfigurationString.isEmpty()) { + throw new RuntimeException("empty contextConfigurationString provided."); + } + Configuration contextConfiguration; + try { + contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy); + } catch (final Exception e) { + final String message = "Unable to de-serialize CLR context configurations using class hierarchy."; + LOG.log(Level.SEVERE, message, e); + throw new RuntimeException(message, e); + } + jallocatedEvaluator.submitContext(contextConfiguration); + } + + public void submitContextAndServiceString(final String contextConfigurationString, final String serviceConfigurationString) { + if (contextConfigurationString.isEmpty()) { + throw new RuntimeException("empty contextConfigurationString provided."); + } + if (serviceConfigurationString.isEmpty()) { + throw new RuntimeException("empty serviceConfigurationString provided."); + } + + Configuration contextConfiguration; + Configuration servicetConfiguration; + try { + contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy); + servicetConfiguration = serializer.fromString(serviceConfigurationString, clrClassHierarchy); + } catch (final Exception e) { + final String message = "Unable to de-serialize CLR context or service configurations using class hierarchy."; + LOG.log(Level.SEVERE, message, e); + throw new RuntimeException(message, e); + } + jallocatedEvaluator.submitContextAndService(contextConfiguration, servicetConfiguration); + } + + public void submitContextAndServiceAndTaskString( + final String contextConfigurationString, + final String serviceConfigurationString, + final String taskConfigurationString) { + if (contextConfigurationString.isEmpty()) { + throw new RuntimeException("empty contextConfigurationString provided."); + } + if (serviceConfigurationString.isEmpty()) { + throw new RuntimeException("empty serviceConfigurationString provided."); + } + if (taskConfigurationString.isEmpty()) { + throw new RuntimeException("empty taskConfigurationString provided."); + } + Configuration contextConfiguration; + Configuration servicetConfiguration; + Configuration taskConfiguration; + try { + contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy); + servicetConfiguration = serializer.fromString(serviceConfigurationString, clrClassHierarchy); + taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy); + } catch (final Exception e) { + final String message = "Unable to de-serialize CLR context or service or task configurations using class hierarchy."; + LOG.log(Level.SEVERE, message, e); + throw new RuntimeException(message, e); + } + jallocatedEvaluator.submitContextAndServiceAndTask(contextConfiguration, servicetConfiguration, taskConfiguration); + } + + public String getEvaluatorDescriptorSring() { + String descriptorString = Utilities.getEvaluatorDescriptorString(jallocatedEvaluator.getEvaluatorDescriptor()); + LOG.log(Level.INFO, "allocated evaluator - serialized evaluator descriptor: " + descriptorString); + return descriptorString; + } + + @Override + public void close() { + jallocatedEvaluator.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java new file mode 100644 index 0000000..62f9ce7 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.util.Optional; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ClosedContextBridge extends NativeBridge implements ClosedContext { + + private static final Logger LOG = Logger.getLogger(ClosedContextBridge.class.getName()); + + private final ClosedContext jcloseContext; + private final ActiveContextBridge parentContext; + private final String contextId; + private final String evaluatorId; + private final EvaluatorDescriptor evaluatorDescriptor; + + public ClosedContextBridge(final ClosedContext closedContext) { + jcloseContext = closedContext; + parentContext = new ActiveContextBridge(closedContext.getParentContext()); + contextId = closedContext.getId(); + evaluatorId = closedContext.getEvaluatorId(); + evaluatorDescriptor = closedContext.getEvaluatorDescriptor(); + } + + @Override + public String getId() { + return contextId; + } + + @Override + public String getEvaluatorId() { + return evaluatorId; + } + + @Override + public Optional<String> getParentId() { + return Optional.of(parentContext.getId()); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return evaluatorDescriptor; + } + + @Override + public void close() throws Exception { + } + + public String getEvaluatorDescriptorSring() { + String descriptorString = Utilities.getEvaluatorDescriptorString(evaluatorDescriptor); + LOG.log(Level.INFO, "Closed Context - serialized evaluator descriptor: " + descriptorString); + return descriptorString; + } + + @Override + public ActiveContext getParentContext() { + return jcloseContext.getParentContext(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedEvaluatorBridge.java new file mode 100644 index 0000000..0e300fd --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedEvaluatorBridge.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.io.naming.Identifiable; + +public class CompletedEvaluatorBridge extends NativeBridge implements Identifiable { + + private final CompletedEvaluator jcompletedEvaluator; + + private final String evaluatorId; + + public CompletedEvaluatorBridge(CompletedEvaluator completedEvaluator) { + jcompletedEvaluator = completedEvaluator; + evaluatorId = completedEvaluator.getId(); + } + + @Override + public String getId() { + return evaluatorId; + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java new file mode 100644 index 0000000..c95ca14 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.task.CompletedTask; + +public class CompletedTaskBridge extends NativeBridge { + + private CompletedTask jcompletedTask; + + private String taskId; + + private ActiveContextBridge jactiveContext; + + public CompletedTaskBridge(CompletedTask completedTask) { + jcompletedTask = completedTask; + taskId = completedTask.getId(); + jactiveContext = new ActiveContextBridge(completedTask.getActiveContext()); + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java new file mode 100644 index 0000000..eca4ba8 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.context.ContextMessage; + +public class ContextMessageBridge extends NativeBridge implements ContextMessage { + + private ContextMessage jcontextMessage; + private String contextMessageId; + private String messageSourceId; + private byte[] message; + + public ContextMessageBridge(ContextMessage contextMessage) { + jcontextMessage = contextMessage; + contextMessageId = contextMessage.getId(); + messageSourceId = contextMessage.getMessageSourceID(); + message = contextMessage.get(); + } + + @Override + public void close() throws Exception { + + } + + @Override + public byte[] get() { + return message; + } + + @Override + public String getId() { + return contextMessageId; + } + + @Override + public String getMessageSourceID() { + return messageSourceId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java new file mode 100644 index 0000000..a712fc4 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.util.logging.LoggingScope; +import org.apache.reef.util.logging.LoggingScopeFactory; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public final class EvaluatorRequestorBridge extends NativeBridge { + private static final Logger LOG = Logger.getLogger(EvaluatorRequestorBridge.class.getName()); + private final boolean isBlocked; + private final EvaluatorRequestor jevaluatorRequestor; + private final LoggingScopeFactory loggingScopeFactory; + + // accumulate how many evaluators have been submitted through this instance + // of EvaluatorRequestorBridge + private int clrEvaluatorsNumber; + + public EvaluatorRequestorBridge(final EvaluatorRequestor evaluatorRequestor, final boolean isBlocked, final LoggingScopeFactory loggingScopeFactory) { + this.jevaluatorRequestor = evaluatorRequestor; + this.clrEvaluatorsNumber = 0; + this.isBlocked = isBlocked; + this.loggingScopeFactory = loggingScopeFactory; + } + + public void submit(final int evaluatorsNumber, final int memory, final int virtualCore, final String rack) { + if (this.isBlocked) { + throw new RuntimeException("Cannot request additional Evaluator, this is probably because the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433."); + } + + if (rack != null && !rack.isEmpty()) { + LOG.log(Level.WARNING, "Ignoring rack preference."); + } + + try (final LoggingScope ls = loggingScopeFactory.evaluatorRequestSubmitToJavaDriver(evaluatorsNumber)) { + clrEvaluatorsNumber += evaluatorsNumber; + + final EvaluatorRequest request = EvaluatorRequest.newBuilder() + .setNumber(evaluatorsNumber) + .setMemory(memory) + .setNumberOfCores(virtualCore) + .build(); + + LOG.log(Level.FINE, "submitting evaluator request {0}", request); + jevaluatorRequestor.submit(request); + } + } + + public int getEvaluatorNumber() { + return clrEvaluatorsNumber; + } + + @Override + public void close() { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java new file mode 100644 index 0000000..dfed7f7 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.context.ContextBase; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.util.Optional; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class FailedContextBridge extends NativeBridge implements ContextBase { + + private static final Logger LOG = Logger.getLogger(FailedContextBridge.class.getName()); + + private final ActiveContextBridge parentContext; + private final EvaluatorDescriptor evaluatorDescriptor; + private final String evaluatorId; + private final String contextId; + private final String parentContextId; + private final FailedContext jfailedContext; + + public FailedContextBridge(final FailedContext failedContext) { + jfailedContext = failedContext; + evaluatorDescriptor = failedContext.getEvaluatorDescriptor(); + evaluatorId = failedContext.getEvaluatorId(); + contextId = failedContext.getId(); + parentContext = failedContext.getParentContext().isPresent() ? + new ActiveContextBridge(failedContext.getParentContext().get()) : null; + parentContextId = parentContext != null ? parentContext.getId() : null; + } + + @Override + public void close() throws Exception { + } + + @Override + public String getId() { + return contextId; + } + + @Override + public String getEvaluatorId() { + return evaluatorId; + } + + @Override + public Optional<String> getParentId() { + if (parentContextId != null) { + return Optional.of(parentContextId); + } else { + return Optional.empty(); + } + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return evaluatorDescriptor; + } + + public String getEvaluatorDescriptorSring() { + String descriptorString = Utilities.getEvaluatorDescriptorString(evaluatorDescriptor); + LOG.log(Level.INFO, "Failed Context - serialized evaluator descriptor: " + descriptorString); + return descriptorString; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java new file mode 100644 index 0000000..bae4946 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.util.logging.LoggingScopeFactory; + +import java.util.logging.Logger; + +public class FailedEvaluatorBridge extends NativeBridge { + private static final Logger LOG = Logger.getLogger(FailedEvaluatorBridge.class.getName()); + private FailedEvaluator jfailedEvaluator; + private EvaluatorRequestorBridge evaluatorRequestorBridge; + private String evaluatorId; + + public FailedEvaluatorBridge(FailedEvaluator failedEvaluator, EvaluatorRequestor evaluatorRequestor, boolean blockedForAdditionalEvaluator, final LoggingScopeFactory loggingScopeFactory) { + jfailedEvaluator = failedEvaluator; + evaluatorId = failedEvaluator.getId(); + evaluatorRequestorBridge = new EvaluatorRequestorBridge(evaluatorRequestor, blockedForAdditionalEvaluator, loggingScopeFactory); + } + + public int getNewlyRequestedEvaluatorNumber() { + return evaluatorRequestorBridge.getEvaluatorNumber(); + } + + @Override + public void close() { + } +} + http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java new file mode 100644 index 0000000..30383ca --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.util.Optional; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class FailedTaskBridge extends NativeBridge { + private static final Logger LOG = Logger.getLogger(FailedTaskBridge.class.getName()); + + private FailedTask jfailedTask; + private ActiveContextBridge jactiveContext; + + public FailedTaskBridge(FailedTask failedTask) { + jfailedTask = failedTask; + Optional<ActiveContext> activeContext = failedTask.getActiveContext(); + jactiveContext = activeContext.isPresent() ? new ActiveContextBridge(activeContext.get()) : null; + } + + public String getFailedTaskString() { + final String description = jfailedTask.getDescription().isPresent() ? jfailedTask.getDescription().get().replace("=", "").replace(",", "") : ""; + final String cause = jfailedTask.getReason().isPresent() ? jfailedTask.getReason().get().toString().replace("=", "").replace(",", "") : ""; + final String data = jfailedTask.getData().isPresent() ? new String(jfailedTask.getData().get()).replace("=", "").replace(",", "") : ""; + + // TODO: deserialize/serialize with proper Avro schema + final String poorSerializedString = "Identifier=" + jfailedTask.getId().replace("=", "").replace(",", "") + + ", Message=" + jfailedTask.getMessage().replace("=", "").replace(",", "") + + ", Description=" + description + + ", Cause=" + cause + + ", Data=" + data; + + LOG.log(Level.INFO, "serialized failed task " + poorSerializedString); + return poorSerializedString; + } + + @Override + public void close() { + } +} + http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java new file mode 100644 index 0000000..3e8a4e5 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +public final class HttpServerEventBridge extends NativeBridge { + private String queryString; + private byte[] queryRequestData; + private byte[] queryResponseData; + private String queryResult; + private String uriSpecification; + + public HttpServerEventBridge(final String queryStr) { + this.queryString = queryStr; + } + + public HttpServerEventBridge(final byte[] queryRequestData) { + this.queryRequestData = queryRequestData; + } + + public final String getQueryString() { + return queryString; + } + + public final void setQueryString(final String queryStr) { + this.queryString = queryStr; + } + + public final String getQueryResult() { + return queryResult; + } + + public final void setQueryResult(final String queryResult) { + this.queryResult = queryResult; + } + + public final String getUriSpecification() { + return uriSpecification; + } + + public final void setUriSpecification(final String uriSpecification) { + this.uriSpecification = uriSpecification; + } + + public final byte[] getQueryRequestData() { + return queryRequestData; + } + + public final void setQueryRequestData(final byte[] queryRequestData) { + this.queryRequestData = queryRequestData; + } + + public final byte[] getQueryResponseData() { + return queryResponseData; + } + + public final void setQueryResponseData(final byte[] responseData) { + queryResponseData = responseData; + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java new file mode 100644 index 0000000..8bfbdfa --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import java.util.HashMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class InteropLogger { + private static final Logger LOG = Logger.getLogger("InteropLogger"); + HashMap<Integer, Level> levelHashMap; + + { + levelHashMap = new HashMap<Integer, Level>(); + levelHashMap.put(Level.OFF.intValue(), Level.OFF); + levelHashMap.put(Level.SEVERE.intValue(), Level.SEVERE); + levelHashMap.put(Level.WARNING.intValue(), Level.WARNING); + levelHashMap.put(Level.INFO.intValue(), Level.INFO); + + levelHashMap.put(Level.CONFIG.intValue(), Level.CONFIG); + levelHashMap.put(Level.FINE.intValue(), Level.FINE); + levelHashMap.put(Level.FINER.intValue(), Level.FINER); + + levelHashMap.put(Level.FINEST.intValue(), Level.FINEST); + levelHashMap.put(Level.ALL.intValue(), Level.ALL); + } + + public void Log(int intLevel, String message) { + if (levelHashMap.containsKey(intLevel)) { + Level level = levelHashMap.get(intLevel); + LOG.log(level, message); + } else { + + LOG.log(Level.WARNING, "Level " + intLevel + " is not a valid Log level"); + LOG.log(Level.WARNING, message); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java new file mode 100644 index 0000000..8ef59d6 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import java.util.ArrayList; + +public class InteropReturnInfo { + + int returnCode; + ArrayList<String> exceptionList = new ArrayList<String>(); + + public void addExceptionString(String exceptionString) { + exceptionList.add(exceptionString); + } + + public boolean hasExceptions() { + return !exceptionList.isEmpty(); + } + + public ArrayList<String> getExceptionList() { + return exceptionList; + } + + public int getReturnCode() { + return returnCode; + } + + public void setReturnCode(int rc) { + returnCode = rc; + } + + public void reset() { + exceptionList = new ArrayList<String>(); + returnCode = 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java new file mode 100644 index 0000000..ba438d8 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +public class JavaBridge { + private final static String CPP_BRIDGE = "JavaClrBridge"; + + static { + try { + System.loadLibrary(CPP_BRIDGE); + } catch (UnsatisfiedLinkError e) { + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java new file mode 100644 index 0000000..fa8b459 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.javabridge; + +import org.apache.commons.compress.utils.IOUtils; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.util.logging.LoggingScope; +import org.apache.reef.util.logging.LoggingScopeFactory; + +import javax.inject.Inject; +import java.io.*; +import java.util.Date; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Loading CLR libraries + */ +public class LibLoader { + + private static final Logger LOG = Logger.getLogger(LibLoader.class.getName()); + + private static final String LIB_BIN = "/"; + private static final String DLL_EXTENSION = ".dll"; + private static final String USER_DIR = "user.dir"; + private static final String[] MANAGED_DLLS = { + "ClrHandler", + "msvcr110", + }; + + private final LoggingScopeFactory loggingScopeFactory; + + private final REEFFileNames reefFileNames; + + @Inject + private LibLoader(final LoggingScopeFactory loggingScopeFactory, final REEFFileNames reefFileNames) { + this.loggingScopeFactory = loggingScopeFactory; + this.reefFileNames = reefFileNames; + } + + /** + * Load CLR libraries + */ + public void loadLib() throws IOException { + LOG.log(Level.INFO, "Loading DLLs for driver at time {0}." + new Date().toString()); + try (final LoggingScope lb = loggingScopeFactory.loadLib()) { + final String tempLoadDir = System.getProperty(USER_DIR) + this.reefFileNames.getLoadDir(); + LOG.log(Level.INFO, "load Folder: " + tempLoadDir); + new File(tempLoadDir).mkdir(); + + loadFromReefJar(this.reefFileNames.getCppBridge(), false); + + loadLibFromGlobal(); + + for (int i = 0; i < MANAGED_DLLS.length; i++) { + loadFromReefJar(MANAGED_DLLS[i], true); + } + } + LOG.log(Level.INFO, "Done loading DLLs for Driver at time {0}." + new Date().toString()); + } + + /** + * Load assemblies at global folder + */ + private void loadLibFromGlobal() { + final String globalFilePath = System.getProperty(USER_DIR) + this.reefFileNames.getReefGlobal(); + final File[] files = new File(globalFilePath).listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(DLL_EXTENSION); + } + }); + + LOG.log(Level.INFO, "Total dll files to load from {0} is {1}.", new Object[] {globalFilePath, files.length} ); + for (int i = 0; i < files.length; i++) { + try { + LOG.log(Level.INFO, "file to load : " + files[i].toString()); + NativeInterop.loadClrAssembly(files[i].toString()); + } catch (final Exception e) { + LOG.log(Level.SEVERE, "exception in loading dll library: ", files[i].toString()); + throw e; + } + } + } + + /** + * Get file from jar file and copy it to temp dir and loads the library to memory + **/ + private void loadFromReefJar(String name, final boolean managed) throws IOException { + + name = name + DLL_EXTENSION; + try { + File fileOut = null; + // get input file from jar + final String path = this.reefFileNames.getReefDriverAppDllDir() + name; + LOG.log(Level.INFO, "Source file path: " + path); + final java.net.URL url = NativeInterop.class.getClass().getResource(path); + if (url != null) { + LOG.log(Level.INFO, "Source file: " + url.getPath()); + } + try (final InputStream in = NativeInterop.class.getResourceAsStream(path)) { + //copy to /reef/CLRLoadingDirectory + final String tempLoadDir = System.getProperty(USER_DIR) + this.reefFileNames.getLoadDir(); + fileOut = new File(tempLoadDir + LIB_BIN + name); + LOG.log(Level.INFO, "Destination file: " + fileOut.toString()); + if (null == in) { + LOG.log(Level.WARNING, "Cannot find " + path); + return; + } + try (final OutputStream out = new FileOutputStream(fileOut) ) { + IOUtils.copy(in, out); + } + } + loadAssembly(fileOut, managed); + } catch (final FileNotFoundException e) { + LOG.log(Level.SEVERE, "File not find exception: ", name); + throw e; + } catch (IOException e) { + LOG.log(Level.SEVERE, "File copy error: ", name); + throw e; + } + } + + /** + * load assembly + * @param fileOut + * @param managed + */ + private void loadAssembly(final File fileOut, final boolean managed) { + if (managed) { + NativeInterop.loadClrAssembly(fileOut.toString()); + LOG.log(Level.INFO, "Loading DLL managed done"); + } else { + System.load(fileOut.toString()); + LOG.log(Level.INFO, "Loading DLL not managed done"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java new file mode 100644 index 0000000..4249ba7 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public abstract class NativeBridge implements AutoCloseable { + + private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName()); + + public void onError(String errorMessage) { + LOG.log(Level.SEVERE, "Bridge received error from CLR: " + errorMessage); + throw new RuntimeException("Bridge received error from CLR: " + errorMessage); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java new file mode 100644 index 0000000..9fe61c1 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import java.util.HashMap; + +public class NativeInterop { + public static final String CLASS_HIERARCHY_FILENAME = "clrClassHierarchy.bin"; + public static final String GLOBAL_LIBRARIES_FILENAME = "userSuppliedGlobalLibraries.txt"; + public static final String EvaluatorRequestorKey = "EvaluatorRequestor"; + public static final String AllocatedEvaluatorKey = "AllocatedEvaluator"; + public static final String ActiveContextKey = "ActiveContext"; + public static final String TaskMessageKey = "TaskMessage"; + public static final String FailedTaskKey = "FailedTask"; + public static final String FailedEvaluatorKey = "FailedEvaluator"; + public static final String HttpServerKey = "HttpServerKey"; + public static final String CompletedTaskKey = "CompletedTask"; + public static final String RunningTaskKey = "RunningTask"; + public static final String SuspendedTaskKey = "SuspendedTask"; + public static final String CompletedEvaluatorKey = "CompletedEvaluator"; + public static final String ClosedContextKey = "ClosedContext"; + public static final String FailedContextKey = "FailedContext"; + public static final String ContextMessageKey = "ContextMessage"; + public static final String DriverRestartKey = "DriverRestart"; + public static final String DriverRestartActiveContextKey = "DriverRestartActiveContext"; + public static final String DriverRestartRunningTaskKey = "DriverRestartRunningTask"; + public static final HashMap<String, Integer> Handlers = new HashMap<String, Integer>() { + { + put(EvaluatorRequestorKey, 0); + put(AllocatedEvaluatorKey, 1); + put(ActiveContextKey, 2); + put(TaskMessageKey, 3); + put(FailedTaskKey, 4); + put(FailedEvaluatorKey, 5); + put(HttpServerKey, 6); + put(CompletedTaskKey, 7); + put(RunningTaskKey, 8); + put(SuspendedTaskKey, 9); + put(CompletedEvaluatorKey, 10); + put(ClosedContextKey, 11); + put(FailedContextKey, 12); + put(ContextMessageKey, 13); + put(DriverRestartKey, 14); + put(DriverRestartActiveContextKey, 15); + put(DriverRestartRunningTaskKey, 16); + } + }; + + public static final int nHandlers = 17; + + public static native void loadClrAssembly(String filePath); + + public static native void ClrBufferedLog(int level, String message); + + public static native long[] CallClrSystemOnStartHandler(String dateTime, String httpServerPortNumber); + + public static native void ClrSystemAllocatedEvaluatorHandlerOnNext( + long handle, + AllocatedEvaluatorBridge javaEvaluatorBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemActiveContextHandlerOnNext( + long handle, + ActiveContextBridge javaActiveContextBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemEvaluatorRequstorHandlerOnNext( + long handle, + EvaluatorRequestorBridge javaEvluatorRequstorBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemTaskMessageHandlerOnNext( + long handle, + byte[] mesage, + TaskMessageBridge javaTaskMessageBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemFailedTaskHandlerOnNext( + long handle, + FailedTaskBridge failedTaskBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemHttpServerHandlerOnNext( + long handle, + HttpServerEventBridge httpServerEventBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemFailedEvaluatorHandlerOnNext( + long handle, + FailedEvaluatorBridge failedEvaluatorBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemCompletedTaskHandlerOnNext( + long handle, + CompletedTaskBridge completedTaskBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemRunningTaskHandlerOnNext( + long handle, + RunningTaskBridge runningTaskBridge, + InteropLogger interopLogger + ); + + public static native void ClrSystemSupendedTaskHandlerOnNext( + long handle, + SuspendedTaskBridge suspendedTaskBridge + ); + + public static native void ClrSystemCompletdEvaluatorHandlerOnNext( + long handle, + CompletedEvaluatorBridge completedEvaluatorBridge + ); + + public static native void ClrSystemClosedContextHandlerOnNext( + long handle, + ClosedContextBridge closedContextBridge + ); + + public static native void ClrSystemFailedContextHandlerOnNext( + long handle, + FailedContextBridge failedContextBridge + ); + + public static native void ClrSystemContextMessageHandlerOnNext( + long handle, + ContextMessageBridge contextMessageBridge + ); + + public static native void ClrSystemDriverRestartHandlerOnNext( + long handle + ); + + public static native void ClrSystemDriverRestartActiveContextHandlerOnNext( + long handle, + ActiveContextBridge activeContextBridge + ); + + public static native void ClrSystemDriverRestartRunningTaskHandlerOnNext( + long handle, + RunningTaskBridge runningTaskBridge + ); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java new file mode 100644 index 0000000..301c4fc --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.RunningTask; + +import java.util.logging.Logger; + +public class RunningTaskBridge extends NativeBridge { + private static final Logger LOG = Logger.getLogger(RunningTaskBridge.class.getName()); + + final private RunningTask jrunningTask; + final private ActiveContextBridge jactiveContext; + + public RunningTaskBridge(RunningTask runningTask) { + jrunningTask = runningTask; + final ActiveContext activeContext = runningTask.getActiveContext(); + jactiveContext = new ActiveContextBridge(activeContext); + } + + public final String getId() { + return jrunningTask.getId(); + } + + public final void send(final byte[] message) { + jrunningTask.send(message); + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java new file mode 100644 index 0000000..16fa3d3 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.task.SuspendedTask; +import org.apache.reef.io.Message; +import org.apache.reef.io.naming.Identifiable; + +public class SuspendedTaskBridge extends NativeBridge implements Identifiable, Message { + + private final SuspendedTask jsuspendedTask; + private final String taskId; + private final ActiveContextBridge jactiveContext; + + public SuspendedTaskBridge(SuspendedTask suspendedTask) { + jsuspendedTask = suspendedTask; + taskId = suspendedTask.getId(); + jactiveContext = new ActiveContextBridge(jsuspendedTask.getActiveContext()); + } + + public ActiveContextBridge getActiveContext() { + return jactiveContext; + } + + @Override + public void close() { + } + + @Override + public String getId() { + return taskId; + } + + @Override + public byte[] get() { + return jsuspendedTask.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java new file mode 100644 index 0000000..25b0478 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.task.TaskMessage; + +public class TaskMessageBridge extends NativeBridge { + private TaskMessage jtaskMessage; + private String taskId; + + // we don't really need to pass this around, just have this as place holder for future. + public TaskMessageBridge(TaskMessage taskMessage) { + jtaskMessage = taskMessage; + taskId = taskMessage.getId(); + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java new file mode 100644 index 0000000..e6d0849 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.tang.ClassHierarchy; +import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy; +import org.apache.reef.tang.proto.ClassHierarchyProto; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class Utilities { + public static ClassHierarchy loadClassHierarchy(String classHierarchyFile) { + Path p = Paths.get(classHierarchyFile); + if (!Files.exists(p)) { + p = Paths.get(System.getProperty("user.dir") + "/reef/global/" + classHierarchyFile); + } + if (!Files.exists(p)) { + throw new RuntimeException("cannot find file " + p.toAbsolutePath()); + } + try (final InputStream chin = new FileInputStream(p.toAbsolutePath().toString())) { + final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin); + final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root); + return ch; + } catch (final IOException e) { + final String message = "Unable to load class hierarchy from " + classHierarchyFile; + throw new RuntimeException(message, e); + } + } + + public static String getEvaluatorDescriptorString(EvaluatorDescriptor evaluatorDescriptor) { + InetSocketAddress socketAddress = evaluatorDescriptor.getNodeDescriptor().getInetSocketAddress(); + return "IP=" + socketAddress.getAddress() + ", Port=" + socketAddress.getPort() + ", HostName=" + socketAddress.getHostName() + ", Memory=" + evaluatorDescriptor.getMemory() + ", Core=" + evaluatorDescriptor.getNumberOfCores(); + } +}
