[
https://issues.apache.org/jira/browse/GEODE-4054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16289993#comment-16289993
]
ASF GitHub Bot commented on GEODE-4054:
---------------------------------------
upthewaterspout closed pull request #1141: GEODE-4054: Create module for
Protobuf message-based client
URL: https://github.com/apache/geode/pull/1141
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle
index 7be16ab645..1e7bece275 100755
--- a/geode-assembly/build.gradle
+++ b/geode-assembly/build.gradle
@@ -62,6 +62,7 @@ dependencies {
archives project(':geode-lucene')
archives project(':geode-old-client-support')
archives project(':geode-protobuf')
+ archives project(':geode-protobuf-client')
archives project(':geode-protobuf-messages')
archives project(':geode-web')
archives project(':geode-web-api')
@@ -372,6 +373,9 @@ distributions {
from project(":geode-protobuf").configurations.runtime
from
project(":geode-protobuf").configurations.archives.allArtifacts.files
+ from project(":geode-protobuf-client").configurations.runtime
+ from
project(":geode-protobuf-client").configurations.archives.allArtifacts.files
+
from project(":geode-protobuf-messages").configurations.runtime
from
project(":geode-protobuf-messages").configurations.archives.allArtifacts.files
diff --git a/geode-protobuf-client/build.gradle
b/geode-protobuf-client/build.gradle
new file mode 100644
index 0000000000..708dd185b0
--- /dev/null
+++ b/geode-protobuf-client/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+ compile project(':geode-core')
+ compile project(':geode-common')
+ compile project(':geode-protobuf-messages')
+
+ testCompile project(':geode-junit')
+ testCompile files(project(':geode-core').sourceSets.test.output)
+
+ testCompile 'org.powermock:powermock-core:' + project.'powermock.version'
+ testCompile 'org.powermock:powermock-module-junit4:' +
project.'powermock.version'
+ testCompile 'org.powermock:powermock-api-mockito:' +
project.'powermock.version'
+
+ compile 'com.google.protobuf:protobuf-java:' +
project.'protobuf-java.version'
+}
diff --git
a/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/Cache.java
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/Cache.java
new file mode 100644
index 0000000000..8f325be761
--- /dev/null
+++
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/Cache.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.client.protobuf;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
+
+@Experimental
+public class Cache {
+ final Socket socket;
+
+ Cache(String host, int port) throws Exception {
+ socket = new Socket(host, port);
+
+ final OutputStream outputStream = socket.getOutputStream();
+ outputStream.write(0x6E); // Magic byte
+ outputStream.write(0x01); // Major version
+
+ final InputStream inputStream = socket.getInputStream();
+ ClientProtocol.Message.newBuilder()
+ .setRequest(ClientProtocol.Request.newBuilder()
+ .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
+
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
+ .build().writeDelimitedTo(outputStream);
+
+ if
(!ClientProtocol.Message.parseDelimitedFrom(inputStream).getResponse().getHandshakeResponse()
+ .getHandshakePassed()) {
+ throw new Exception("Failed handshake.");
+ }
+ }
+
+ public Set<String> getRegionNames() throws Exception {
+ Set<String> regionNames = new HashSet<>();
+
+ final OutputStream outputStream = socket.getOutputStream();
+ ClientProtocol.Message.newBuilder()
+ .setRequest(ClientProtocol.Request.newBuilder()
+
.setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder()))
+ .build().writeDelimitedTo(outputStream);
+
+ final InputStream inputStream = socket.getInputStream();
+ final RegionAPI.GetRegionNamesResponse getRegionNamesResponse =
ClientProtocol.Message
+
.parseDelimitedFrom(inputStream).getResponse().getGetRegionNamesResponse();
+ for (int i = 0; i < getRegionNamesResponse.getRegionsCount(); ++i) {
+ regionNames.add(getRegionNamesResponse.getRegions(i));
+ }
+
+ return regionNames;
+ }
+
+ public RegionAttributes getRegionAttributes(String regionName) throws
Exception {
+ final OutputStream outputStream = socket.getOutputStream();
+ ClientProtocol.Message.newBuilder()
+ .setRequest(ClientProtocol.Request.newBuilder()
+
.setGetRegionRequest(RegionAPI.GetRegionRequest.newBuilder().setRegionName(regionName)))
+ .build().writeDelimitedTo(outputStream);
+
+ final InputStream inputStream = socket.getInputStream();
+ return new
RegionAttributes(ClientProtocol.Message.parseDelimitedFrom(inputStream).getResponse()
+ .getGetRegionResponse().getRegion());
+ }
+
+ public <K, V> Region<K, V> getRegion(String regionName) {
+ return new Region(regionName, socket);
+ }
+
+ public class RegionAttributes {
+ public String name;
+ public String dataPolicy;
+ public String scope;
+ public String keyConstraint;
+ public String valueConstraint;
+ public boolean persisted;
+ public long size;
+
+ public RegionAttributes(BasicTypes.Region region) {
+ this.name = region.getName();
+ this.dataPolicy = region.getDataPolicy();
+ this.scope = region.getScope();
+ this.keyConstraint = region.getKeyConstraint();
+ this.valueConstraint = region.getValueConstraint();
+ this.persisted = region.getPersisted();
+ this.size = region.getSize();
+ }
+ }
+}
diff --git
a/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/CacheFactory.java
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/CacheFactory.java
new file mode 100644
index 0000000000..aa65f3c216
--- /dev/null
+++
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/CacheFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.client.protobuf;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
+
+@Experimental
+public class CacheFactory {
+ public static class ServerInfo {
+ public String host;
+ public int port;
+
+ public ServerInfo(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+ }
+
+ String host;
+ int port;
+
+ public CacheFactory() {
+ // NOP
+ }
+
+ public CacheFactory addLocator(String host, int port) {
+ this.host = host;
+ this.port = port;
+ return this;
+ }
+
+ public Collection<ServerInfo> getAvailableServers() throws Exception {
+ final Socket locatorSocket = new Socket(host, port);
+
+ OutputStream outputStream = locatorSocket.getOutputStream();
+ // Once GEODE-4010 is fixed, this can revert to just the communication
mode and major version.
+ outputStream.write(0x00); // NON_GOSSIP_REQUEST_VERSION
+ outputStream.write(0x00); // NON_GOSSIP_REQUEST_VERSION
+ outputStream.write(0x00); // NON_GOSSIP_REQUEST_VERSION
+ outputStream.write(0x00); // NON_GOSSIP_REQUEST_VERSION
+ outputStream.write(0x6E); // Magic byte
+ outputStream.write(0x01); // Major version
+
+ ClientProtocol.Message.newBuilder()
+ .setRequest(ClientProtocol.Request.newBuilder()
+
.setGetAvailableServersRequest(LocatorAPI.GetAvailableServersRequest.newBuilder()))
+ .build().writeDelimitedTo(outputStream);
+
+ InputStream inputStream = locatorSocket.getInputStream();
+ LocatorAPI.GetAvailableServersResponse getAvailableServersResponse =
ClientProtocol.Message
+
.parseDelimitedFrom(inputStream).getResponse().getGetAvailableServersResponse();
+ if (getAvailableServersResponse.getServersCount() < 1) {
+ throw new Exception("No available servers");
+ }
+
+ ArrayList<ServerInfo> availableServers =
+ new ArrayList<>(getAvailableServersResponse.getServersCount());
+ for (int i = 0; i < getAvailableServersResponse.getServersCount(); ++i) {
+ final BasicTypes.Server server =
getAvailableServersResponse.getServers(i);
+ availableServers.add(new ServerInfo(server.getHostname(),
server.getPort()));
+ }
+ return availableServers;
+ }
+
+ public Cache connect() throws Exception {
+ final ServerInfo arbitraryServer = getAvailableServers().iterator().next();
+ return new Cache(arbitraryServer.host, arbitraryServer.port);
+ }
+}
diff --git
a/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/MethodInvoker.java
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/MethodInvoker.java
new file mode 100644
index 0000000000..e40cb397a0
--- /dev/null
+++
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/MethodInvoker.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.client.protobuf;
+
+import java.io.BufferedReader;
+import java.io.Reader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.geode.annotations.Experimental;
+
+@Experimental
+public class MethodInvoker {
+ public interface Prompter {
+ void prompt();
+ }
+
+ public interface Transcriber {
+ void transcribe(String str);
+ }
+
+ private static void invoke(Queue<String> tokens, Object object, String name)
throws Throwable {
+ final Method method = findMethod(object.getClass(), name);
+ if (method == null) {
+ throw new NoSuchMethodException("Method " + name + " does not exist in "
+ object.getClass());
+ }
+
+ if (0 < method.getParameterCount()) {
+ Object[] parameters = new Object[method.getParameterCount()];
+ int index = 0;
+ for (Type type : method.getParameterTypes()) {
+ // Adjust the type to box primitive types.
+ if (boolean.class == type) {
+ type = Boolean.class;
+ } else if (byte.class == type) {
+ type = Byte.class;
+ } else if (short.class == type) {
+ type = Short.class;
+ } else if (int.class == type) {
+ type = Integer.class;
+ } else if (long.class == type) {
+ type = Long.class;
+ } else if (float.class == type) {
+ type = Float.class;
+ } else if (double.class == type) {
+ type = Double.class;
+ }
+
+ if (!tokens.isEmpty()) {
+ final String parameter = tokens.remove();
+ try {
+ Constructor constructor = ((Class)
type).getConstructor(String.class);
+ parameters[index++] = constructor.newInstance(parameter);
+ } catch (IllegalAccessException | InstantiationException |
NoSuchMethodException e) {
+ e.printStackTrace(System.err);
+ parameters[index++] = parameter;
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ }
+ } else {
+ parameters[index++] = null;
+ }
+ }
+ try {
+ method.invoke(object, parameters);
+ } catch (IllegalAccessException e) {
+ e.printStackTrace(System.err);
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ }
+ } else {
+ try {
+ method.invoke(object);
+ } catch (IllegalAccessException e) {
+ e.printStackTrace(System.err);
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ }
+ }
+ }
+
+ private static Method findMethod(Class clazz, String name) {
+ if (name != null) {
+ for (Method method : clazz.getDeclaredMethods()) {
+ if (name.equals(method.getName())) {
+ return method;
+ }
+ }
+ }
+ return null;
+ }
+
+ protected void invoke(Reader reader, Object object) throws Throwable {
+ invoke(reader, object, () -> {
+ }, (str) -> {
+ });
+ }
+
+ protected void invoke(Reader reader, Object object, Prompter prompter,
Transcriber transcriber)
+ throws Throwable {
+ final BufferedReader input = new BufferedReader(reader);
+ while (true) {
+ prompter.prompt();
+ String line = input.readLine();
+ if (null == line) {
+ break;
+ }
+
+ final int index = line.indexOf('#');
+ if (0 <= index) {
+ line = line.substring(0, index);
+ }
+
+ line = line.trim();
+ if (line.isEmpty()) {
+ continue;
+ }
+ transcriber.transcribe(line);
+
+ Queue<String> tokens = new LinkedList<String>();
+ String[] strings = line.split("(?<=[^\\\\])[ \t]");
+ for (String string : strings) {
+ string = string.replace("#.*", "");
+ string = string.replace("\\ ", " ");
+ string = string.replace("\\\t", "\t");
+ tokens.add(string);
+ }
+ while (!tokens.isEmpty()) {
+ final String name = tokens.remove();
+ try {
+ invoke(tokens, object, name);
+ } catch (NoSuchMethodException nsm) {
+ System.err.println(nsm.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git
a/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/ProtobufClientRunner.java
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/ProtobufClientRunner.java
new file mode 100644
index 0000000000..5fb82db3d7
--- /dev/null
+++
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/ProtobufClientRunner.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.client.protobuf;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+
+import org.apache.geode.annotations.Experimental;
+
+@Experimental
+public class ProtobufClientRunner extends MethodInvoker {
+ private static class FilePrompterAndTranscriber implements Prompter,
Transcriber {
+ private final String path;
+ private int line = 0;
+
+ private FilePrompterAndTranscriber(File file) {
+ path = file.getAbsolutePath();
+ }
+
+ public void prompt() {
+ ++line;
+ }
+
+ public void transcribe(String str) {
+ System.out.println("# File " + path + ", line " + line + ": " + str);
+ }
+ }
+
+ private CacheFactory cacheFactory = null;
+ private Cache cache = null;
+ private Region<Integer, String> region = null;
+ private int line = 0;
+
+ public static void main(String[] args) {
+ try {
+ ProtobufClientRunner runner = new ProtobufClientRunner();
+ runner.invoke(args);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ public void invoke(String[] args) throws Throwable {
+ if (args.length < 1) {
+ invoke(new InputStreamReader(System.in), this, () -> System.out.print(">
"), (str) -> {
+ });
+ } else {
+ final File file = new File(args[0]);
+ if (file.exists() && !file.isDirectory()) {
+ FilePrompterAndTranscriber filePrompterAndTranscriber =
+ new FilePrompterAndTranscriber(file);
+ invoke(new FileReader(file), this, filePrompterAndTranscriber,
filePrompterAndTranscriber);
+ } else {
+ StringBuilder builder = new StringBuilder();
+ for (String arg : args) {
+ if (arg.contains(" ") || arg.contains("\t")) {
+ arg = arg.replace(" ", "\\ ");
+ arg = arg.replace("\t", "\\\t");
+ }
+ if (0 < builder.length()) {
+ builder.append(' ');
+ }
+ builder.append(arg);
+ }
+ invoke(new StringReader(builder.toString()), this);
+ }
+ }
+ }
+
+ public void quit() {
+ System.exit(0);
+ }
+
+ public void addLocator(String host, int port) {
+ cacheFactory = new CacheFactory().addLocator(host, port);
+ }
+
+ public void getAvailableServers() throws Exception {
+ if (cacheFactory == null) {
+ throw new Exception("No locator");
+ }
+
+ for (CacheFactory.ServerInfo server : cacheFactory.getAvailableServers()) {
+ System.out.println(server.host + "[" + server.port + "]");
+ }
+ }
+
+ public void connect() throws Exception {
+ cache = cacheFactory.connect();
+ }
+
+ public void getRegion(String name) {
+ region = cache.getRegion(name);
+ }
+}
diff --git
a/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/Region.java
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/Region.java
new file mode 100644
index 0000000000..6824c8c666
--- /dev/null
+++
b/geode-protobuf-client/src/main/java/org/apache/geode/internal/cache/client/protobuf/Region.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.client.protobuf;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
+
+@Experimental
+public class Region<K, V> {
+ final String name;
+ final Socket socket;
+
+ BasicTypes.EncodedValue encodeValue(Object unencodedValue) {
+ BasicTypes.EncodedValue.Builder builder =
BasicTypes.EncodedValue.newBuilder();
+ if (Integer.class.equals(unencodedValue.getClass())) {
+ builder.setIntResult((Integer) unencodedValue);
+ } else if (Long.class.equals(unencodedValue.getClass())) {
+ builder.setLongResult((Long) unencodedValue);
+ } else if (Short.class.equals(unencodedValue.getClass())) {
+ builder.setShortResult((Short) unencodedValue);
+ } else if (Byte.class.equals(unencodedValue.getClass())) {
+ builder.setByteResult((Byte) unencodedValue);
+ } else if (Double.class.equals(unencodedValue.getClass())) {
+ builder.setDoubleResult((Double) unencodedValue);
+ } else if (Float.class.equals(unencodedValue.getClass())) {
+ builder.setFloatResult((Float) unencodedValue);
+ } else if (byte[].class.equals(unencodedValue.getClass())) {
+ builder.setBinaryResult(ByteString.copyFrom((byte[]) unencodedValue));
+ } else if (Boolean.class.equals(unencodedValue.getClass())) {
+ builder.setBooleanResult((Boolean) unencodedValue);
+ } else if (String.class.equals(unencodedValue.getClass())) {
+ builder.setStringResult((String) unencodedValue);
+ }
+ return builder.build();
+ }
+
+ Object decodeValue(BasicTypes.EncodedValue encodedValue) {
+ switch (encodedValue.getValueCase()) {
+ case BINARYRESULT:
+ return encodedValue.getBinaryResult().toByteArray();
+ case BOOLEANRESULT:
+ return encodedValue.getBooleanResult();
+ case BYTERESULT:
+ return (byte) encodedValue.getByteResult();
+ case DOUBLERESULT:
+ return encodedValue.getDoubleResult();
+ case FLOATRESULT:
+ return encodedValue.getFloatResult();
+ case INTRESULT:
+ return encodedValue.getIntResult();
+ case LONGRESULT:
+ return encodedValue.getLongResult();
+ case SHORTRESULT:
+ return (short) encodedValue.getShortResult();
+ case STRINGRESULT:
+ return encodedValue.getStringResult();
+ default:
+ return null;
+ }
+ }
+
+ BasicTypes.Entry encodeEntry(Object unencodedKey, Object unencodedValue) {
+ if (unencodedValue == null) {
+ return
BasicTypes.Entry.newBuilder().setKey(encodeValue(unencodedKey)).build();
+ }
+ return BasicTypes.Entry.newBuilder().setKey(encodeValue(unencodedKey))
+ .setValue(encodeValue(unencodedValue)).build();
+ }
+
+ Region(String name, Socket socket) {
+ this.name = name;
+ this.socket = socket;
+ }
+
+ public V get(K key) throws Exception {
+ final OutputStream outputStream = socket.getOutputStream();
+ ClientProtocol.Message.newBuilder()
+
.setRequest(ClientProtocol.Request.newBuilder().setGetRequest(RegionAPI.GetRequest
+
.newBuilder().setRegionName(name).setKey(encodeValue(key.toString()))))
+ .build().writeDelimitedTo(outputStream);
+
+ // TODO: How does one get a java.lang.Object out of
+ // org.apache.geode.internal.protocol.protobuf.v1.EncodedValue?
+ final InputStream inputStream = socket.getInputStream();
+ return (V)
ClientProtocol.Message.parseDelimitedFrom(inputStream).getResponse().getGetResponse()
+ .getResult().getStringResult();
+ }
+
+ public Map<K, V> getAll(Collection<K> keys) throws Exception {
+ Map<K, V> values = new HashMap<>();
+
+ final OutputStream outputStream = socket.getOutputStream();
+ RegionAPI.GetAllRequest.Builder getAllRequest =
RegionAPI.GetAllRequest.newBuilder();
+ getAllRequest.setRegionName(name);
+ for (K key : keys) {
+ getAllRequest.addKey(encodeValue(key.toString()));
+ }
+ ClientProtocol.Message.newBuilder()
+
.setRequest(ClientProtocol.Request.newBuilder().setGetAllRequest(getAllRequest)).build()
+ .writeDelimitedTo(outputStream);
+
+ final InputStream inputStream = socket.getInputStream();
+ final RegionAPI.GetAllResponse getAllResponse =
+
ClientProtocol.Message.parseDelimitedFrom(inputStream).getResponse().getGetAllResponse();
+ for (BasicTypes.Entry entry : getAllResponse.getEntriesList()) {
+ // TODO: How does one get a java.lang.Object out of
+ // org.apache.geode.internal.protocol.protobuf.v1.EncodedValue?
+ // TODO values.put((K) entry.getKey().getStringResult(), (V)
+ // entry.getValue().getStringResult());
+ }
+
+ return values;
+ }
+
+ public void put(K key, V value) throws Exception {
+ final OutputStream outputStream = socket.getOutputStream();
+ ClientProtocol.Message.newBuilder()
+ .setRequest(ClientProtocol.Request.newBuilder()
+
.setPutRequest(RegionAPI.PutRequest.newBuilder().setRegionName(name)
+ .setEntry(encodeEntry(key.toString(), value))))
+ .build().writeDelimitedTo(outputStream);
+
+ final InputStream inputStream = socket.getInputStream();
+
ClientProtocol.Message.parseDelimitedFrom(inputStream).getResponse().getPutResponse();
+ }
+
+ public void putAll(Map<K, V> values) throws Exception {
+ final OutputStream outputStream = socket.getOutputStream();
+ RegionAPI.PutAllRequest.Builder putAllRequest =
RegionAPI.PutAllRequest.newBuilder();
+ putAllRequest.setRegionName(name);
+ for (K key : values.keySet()) {
+ putAllRequest.addEntry(encodeEntry(key, values.get(key)));
+ }
+ ClientProtocol.Message.newBuilder()
+
.setRequest(ClientProtocol.Request.newBuilder().setPutAllRequest(putAllRequest)).build()
+ .writeDelimitedTo(outputStream);
+
+ final InputStream inputStream = socket.getInputStream();
+ final RegionAPI.PutAllResponse putAllResponse =
+
ClientProtocol.Message.parseDelimitedFrom(inputStream).getResponse().getPutAllResponse();
+ if (0 < putAllResponse.getFailedKeysCount()) {
+ StringBuilder builder = new StringBuilder();
+ for (BasicTypes.KeyedError keyedError :
putAllResponse.getFailedKeysList()) {
+ if (0 < builder.length()) {
+ builder.append(", ");
+ }
+ builder.append(decodeValue(keyedError.getKey()).toString());
+ }
+ throw new Exception("Unable to put the following keys: " +
builder.toString());
+ }
+ }
+
+ public void remove(K key) throws Exception {
+ final OutputStream outputStream = socket.getOutputStream();
+ ClientProtocol.Message.newBuilder()
+
.setRequest(ClientProtocol.Request.newBuilder().setRemoveRequest(RegionAPI.RemoveRequest
+
.newBuilder().setRegionName(name).setKey(encodeValue(key.toString()))))
+ .build().writeDelimitedTo(outputStream);
+
+ final InputStream inputStream = socket.getInputStream();
+
ClientProtocol.Message.parseDelimitedFrom(inputStream).getResponse().getRemoveResponse();
+ }
+
+ public void removeAll(Collection<K> keys) throws Exception {
+ final OutputStream outputStream = socket.getOutputStream();
+ RegionAPI.RemoveAllRequest.Builder removeAllRequest =
RegionAPI.RemoveAllRequest.newBuilder();
+ removeAllRequest.setRegionName(name);
+ for (K key : keys) {
+ removeAllRequest.addKey(encodeValue(key.toString()));
+ }
+ ClientProtocol.Message.newBuilder()
+
.setRequest(ClientProtocol.Request.newBuilder().setRemoveAllRequest(removeAllRequest))
+ .build().writeDelimitedTo(outputStream);
+
+ final InputStream inputStream = socket.getInputStream();
+ if
(!ClientProtocol.Message.parseDelimitedFrom(inputStream).getResponse().getRemoveAllResponse()
+ .getSuccess()) {
+ throw new Exception("Unable to remove all entries");
+ }
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index a319689567..2e59007640 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -39,6 +39,7 @@ include 'extensions/geode-modules-session-internal'
include 'extensions/geode-modules-session'
include 'extensions/geode-modules-assembly'
include 'geode-protobuf'
+include 'geode-protobuf-client'
include 'geode-protobuf-messages'
include 'extensions/session-testing-war'
include 'geode-concurrency-test'
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Create module for Protobuf message-based client
> -----------------------------------------------
>
> Key: GEODE-4054
> URL: https://issues.apache.org/jira/browse/GEODE-4054
> Project: Geode
> Issue Type: Improvement
> Components: client/server
> Reporter: Michael Dodge
>
> Create a module, geode-protobuf-client, that contains a simple Java client
> that exercises the Protobuf messages and the new protocol. This client should
> allow the interaction with a locator and cache server based on command-line
> arguments, a file of commands, or an interactive shell.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)