http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestMemcacheProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestMemcacheProtocolSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestMemcacheProtocolSelfTest.java new file mode 100644 index 0000000..b73fabd --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestMemcacheProtocolSelfTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest; + +import junit.framework.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * TCP protocol test. + */ +@SuppressWarnings("unchecked") +public class GridRestMemcacheProtocolSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private static final String HOST = "127.0.0.1"; + + /** */ + private static final int PORT = 11212; + + /** */ + private GridTestMemcacheClient client; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + client = client(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + client.shutdown(); + + grid().cache(null).clearAll(); + grid().cache(CACHE_NAME).clearAll(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost(HOST); + + assert cfg.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestTcpPort(PORT); + + cfg.setClientConnectionConfiguration(clientCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(CACHE_NAME)); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + private CacheConfiguration cacheConfiguration(@Nullable String cacheName) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setCacheMode(LOCAL); + cfg.setName(cacheName); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * @return Client. + * @throws IgniteCheckedException In case of error. + */ + private GridTestMemcacheClient client() throws IgniteCheckedException { + return new GridTestMemcacheClient(HOST, PORT); + } + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + assertTrue(client.cachePut(null, "key1", "val1")); + assertEquals("val1", grid().cache(null).get("key1")); + + assertTrue(client.cachePut(CACHE_NAME, "key1", "val1")); + assertEquals("val1", grid().cache(CACHE_NAME).get("key1")); + } + + /** + * @throws Exception If failed. + */ + public void testGet() throws Exception { + assertTrue(grid().cache(null).putx("key", "val")); + + Assert.assertEquals("val", client.cacheGet(null, "key")); + + assertTrue(grid().cache(CACHE_NAME).putx("key", "val")); + + Assert.assertEquals("val", client.cacheGet(CACHE_NAME, "key")); + } + + /** + * @throws Exception If failed. + */ + public void testRemove() throws Exception { + assertTrue(grid().cache(null).putx("key", "val")); + + assertTrue(client.cacheRemove(null, "key")); + assertFalse(client.cacheRemove(null, "wrongKey")); + + assertNull(grid().cache(null).get("key")); + + assertTrue(grid().cache(CACHE_NAME).putx("key", "val")); + + assertTrue(client.cacheRemove(CACHE_NAME, "key")); + assertFalse(client.cacheRemove(CACHE_NAME, "wrongKey")); + + assertNull(grid().cache(CACHE_NAME).get("key")); + } + + /** + * @throws Exception If failed. + */ + public void testAdd() throws Exception { + assertTrue(client.cacheAdd(null, "key", "val")); + assertEquals("val", grid().cache(null).get("key")); + assertFalse(client.cacheAdd(null, "key", "newVal")); + assertEquals("val", grid().cache(null).get("key")); + + assertTrue(client.cacheAdd(CACHE_NAME, "key", "val")); + assertEquals("val", grid().cache(CACHE_NAME).get("key")); + assertFalse(client.cacheAdd(CACHE_NAME, "key", "newVal")); + assertEquals("val", grid().cache(CACHE_NAME).get("key")); + } + + /** + * @throws Exception If failed. + */ + public void testReplace() throws Exception { + assertFalse(client.cacheReplace(null, "key1", "val1")); + assertTrue(grid().cache(null).putx("key1", "val1")); + assertTrue(client.cacheReplace(null, "key1", "val2")); + + assertFalse(client.cacheReplace(null, "key2", "val1")); + assertTrue(grid().cache(null).putx("key2", "val1")); + assertTrue(client.cacheReplace(null, "key2", "val2")); + + grid().cache(null).clearAll(); + + assertFalse(client.cacheReplace(CACHE_NAME, "key1", "val1")); + assertTrue(grid().cache(CACHE_NAME).putx("key1", "val1")); + assertTrue(client.cacheReplace(CACHE_NAME, "key1", "val2")); + } + + /** + * @throws Exception If failed. + */ + public void testMetrics() throws Exception { + grid().cache(null).resetMetrics(); + grid().cache(CACHE_NAME).resetMetrics(); + + grid().cache(null).putx("key1", "val"); + grid().cache(null).putx("key2", "val"); + grid().cache(null).putx("key2", "val"); + + grid().cache(null).get("key1"); + grid().cache(null).get("key2"); + grid().cache(null).get("key2"); + + grid().cache(CACHE_NAME).putx("key1", "val"); + grid().cache(CACHE_NAME).putx("key2", "val"); + grid().cache(CACHE_NAME).putx("key2", "val"); + + grid().cache(CACHE_NAME).get("key1"); + grid().cache(CACHE_NAME).get("key2"); + grid().cache(CACHE_NAME).get("key2"); + + Map<String, Long> m = client.cacheMetrics(null); + + assertNotNull(m); + assertEquals(7, m.size()); + assertEquals(3, m.get("reads").longValue()); + assertEquals(3, m.get("writes").longValue()); + + m = client.cacheMetrics(CACHE_NAME); + + assertNotNull(m); + assertEquals(7, m.size()); + assertEquals(3, m.get("reads").longValue()); + assertEquals(3, m.get("writes").longValue()); + } + + /** + * @throws Exception If failed. + */ + public void testIncrement() throws Exception { + assertEquals(15L, client().cacheIncrement(null, "key", 10L, 5L)); + assertEquals(15L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(18L, client().cacheIncrement(null, "key", 20L, 3L)); + assertEquals(18L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(20L, client().cacheIncrement(null, "key", null, 2L)); + assertEquals(20L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); + + assertEquals(15L, client().cacheIncrement(CACHE_NAME, "key", 10L, 5L)); + assertEquals(15L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(18L, client().cacheIncrement(CACHE_NAME, "key", 20L, 3L)); + assertEquals(18L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(20L, client().cacheIncrement(CACHE_NAME, "key", null, 2L)); + assertEquals(20L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); + } + + /** + * @throws Exception If failed. + */ + public void testDecrement() throws Exception { + assertEquals(15L, client().cacheDecrement(null, "key", 20L, 5L)); + assertEquals(15L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(12L, client().cacheDecrement(null, "key", 20L, 3L)); + assertEquals(12L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(10L, client().cacheDecrement(null, "key", null, 2L)); + assertEquals(10L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); + + assertEquals(15L, client().cacheDecrement(CACHE_NAME, "key", 20L, 5L)); + assertEquals(15L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(12L, client().cacheDecrement(CACHE_NAME, "key", 20L, 3L)); + assertEquals(12L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(10L, client().cacheDecrement(CACHE_NAME, "key", null, 2L)); + assertEquals(10L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); + } + + /** + * @throws Exception If failed. + */ + public void testAppend() throws Exception { + assertFalse(client.cacheAppend(null, "wrongKey", "_suffix")); + assertFalse(client.cacheAppend(CACHE_NAME, "wrongKey", "_suffix")); + + assertTrue(grid().cache(null).putx("key", "val")); + assertTrue(client.cacheAppend(null, "key", "_suffix")); + assertEquals("val_suffix", grid().cache(null).get("key")); + + assertTrue(grid().cache(CACHE_NAME).putx("key", "val")); + assertTrue(client.cacheAppend(CACHE_NAME, "key", "_suffix")); + assertEquals("val_suffix", grid().cache(CACHE_NAME).get("key")); + } + + /** + * @throws Exception If failed. + */ + public void testPrepend() throws Exception { + assertFalse(client.cachePrepend(null, "wrongKey", "prefix_")); + assertFalse(client.cachePrepend(CACHE_NAME, "wrongKey", "prefix_")); + + assertTrue(grid().cache(null).putx("key", "val")); + assertTrue(client.cachePrepend(null, "key", "prefix_")); + assertEquals("prefix_val", grid().cache(null).get("key")); + + assertTrue(grid().cache(CACHE_NAME).putx("key", "val")); + assertTrue(client.cachePrepend(CACHE_NAME, "key", "prefix_")); + assertEquals("prefix_val", grid().cache(CACHE_NAME).get("key")); + } + + /** + * @throws Exception If failed. + */ + public void testVersion() throws Exception { + assertNotNull(client.version()); + } + + /** + * @throws Exception If failed. + */ + public void testNoop() throws Exception { + client.noop(); + } + + /** + * @throws Exception If failed. + */ + public void testQuit() throws Exception { + client.quit(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorMultiStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorMultiStartSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorMultiStartSelfTest.java new file mode 100644 index 0000000..c350305 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorMultiStartSelfTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest; + +import org.apache.ignite.configuration.*; +import org.gridgain.testframework.junits.common.*; + +/** + * Rest processor test. + */ +public class GridRestProcessorMultiStartSelfTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setRestEnabled(true); + + return cfg; + } + + /** + * Test that multiple nodes can start with JETTY enabled. + * + * @throws Exception If failed. + */ + public void testMultiStart() throws Exception { + try { + for (int i = 0; i < GRID_CNT; i++) + startGrid(i); + + stopGrid(0); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorStartSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorStartSelfTest.java new file mode 100644 index 0000000..619b7ee --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorStartSelfTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.client.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class GridRestProcessorStartSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String HOST = "127.0.0.1"; + + /** */ + public static final int TCP_PORT = 11222; + + /** */ + private CountDownLatch gridReady; + + /** */ + private CountDownLatch proceed; + + /** {@inheritDoc}*/ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost(HOST); + + assert cfg.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestTcpPort(TCP_PORT); + + cfg.setClientConnectionConfiguration(clientCfg); + + TestDiscoverySpi disc = new TestDiscoverySpi(); + + disc.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disc); + + return cfg; + } + + /** {@inheritDoc}*/ + @Override protected void beforeTest() throws Exception { + gridReady = new CountDownLatch(1); + proceed = new CountDownLatch(1); + } + + /** {@inheritDoc}*/ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTcpStart() throws Exception { + GridClientConfiguration clCfg = new GridClientConfiguration(); + + clCfg.setProtocol(GridClientProtocol.TCP); + clCfg.setServers(Collections.singleton(HOST + ":" + TCP_PORT)); + + doTest(clCfg); + } + + /** + * @param cfg Client configuration. + * @throws Exception If failed. + */ + private void doTest(final GridClientConfiguration cfg) throws Exception { + GridTestUtils.runAsync(new IgniteCallable<Object>() { + @Override public Object call() { + try { + startGrid(); + } + catch (Exception e) { + log().error("Grid start failed", e); + + fail(); + } + + return null; + } + }); + + try { + gridReady.await(); + + IgniteFuture<GridClient> c = GridTestUtils.runAsync(new Callable<GridClient>() { + @Override public GridClient call() throws Exception { + return GridClientFactory.start(cfg); + } + }); + + try { + proceed.countDown(); + + c.get().compute().refreshTopology(false, false); + } + finally { + GridClientFactory.stopAll(); + } + } + catch (Throwable e) { + e.printStackTrace(); + } + finally { + proceed.countDown(); + } + } + + /** + * Test SPI. + */ + private class TestDiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + gridReady.countDown(); + + try { + proceed.await(); + } + catch (InterruptedException e) { + throw new IgniteSpiException("Failed to await start signal.", e); + } + + super.spiStart(gridName); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorTest.java new file mode 100644 index 0000000..5877628 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridRestProcessorTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import javax.swing.*; +import java.io.*; +import java.util.*; + +/** + * Rest processor test. + * <p> + * URLs to test: + * <ul> + * <li>http://localhost:8080/gridgain?cmd=get&key=simpleBean</li> + * <li>http://localhost:8080/gridgain?cmd=get&key=complexBean</li> + * <li>http://localhost:8080/gridgain?cmd=get&key=list</li> + * <li>http://localhost:8080/gridgain?cmd=get&key=map</li> + * <li>http://localhost:8080/gridgain?cmd=get&key=int</li> + * <li>http://localhost:8080/gridgain?cmd=get&key=string</li> + * <li>http://localhost:8080/gridgain?cmd=get&key=date</li> + * <li>http://localhost:8080/gridgain?cmd=top</li> + * <li>http://localhost:8080/gridgain?cmd=exe&name=org.apache.ignite.internal.processors.rest.TestTask2</li> + * <li>http://localhost:8080/gridgain?cmd=exe&name=org.apache.ignite.internal.processors.rest.TestTask2&async=true</li> + * <li>http://localhost:8080/gridgain?cmd=res&id=XXXX</li> + * </ul> + */ +public class GridRestProcessorTest extends GridCommonAbstractTest { + /** Counter */ + private static int cntr; + + /** */ + public GridRestProcessorTest() { + super(/*start grid*/false); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** + * @throws Exception If failed. + */ + public void testRest() throws Exception { + IgniteConfiguration cfg = getConfiguration((String)null); + + cfg = cacheTestConfiguration(cfg); + + G.start(cfg); + + populateCache(); + + deployTasks(); + + // Wait until Ok is pressed. + JOptionPane.showMessageDialog( + null, + new JComponent[] { + new JLabel("GridGain started."), + new JLabel( + "<html>" + + "You can use JMX console at <u>http://localhost:1234</u>" + + "</html>"), + new JLabel("Press OK to stop GridGain.") + }, + "GridGain Startup JUnit", + JOptionPane.INFORMATION_MESSAGE + ); + + G.stop(true); + } + + /** + * @param cfg Initial configuration. + * @return Final configuration. + */ + @SuppressWarnings({"unchecked"}) + private IgniteConfiguration cacheTestConfiguration(IgniteConfiguration cfg) { + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); + + ipFinder.setShared(true); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + assert cfg.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + // Ensure - no authentication. + clientCfg.setRestSecretKey(null); + + cfg.setClientConnectionConfiguration(clientCfg); + + cfg.setCacheConfiguration(defaultCacheConfiguration()); + + return cfg; + } + + /** + * @return Integer. + */ + private int intValue() { + return ++cntr; + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void populateCache() throws IgniteCheckedException { + GridCache<String, Object> cache = G.ignite().cache(null); + + cache.put("int", intValue()); + cache.put("string", "cacheString"); + cache.put("date", new Date()); + cache.put("list", createCollection()); + cache.put("map", createMap()); + cache.put("simpleBean", new SimpleBean()); + + ComplexBean bean = new ComplexBean(new SimpleBean(intValue(), "complexSimpleString")); + + bean.setComplexBean(new ComplexBean(new SimpleBean(intValue(), "complexComplexString"))); + + cache.put("complexBean", bean); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void deployTasks() throws IgniteCheckedException { + G.ignite().compute().localDeployTask(TestTask1.class, TestTask1.class.getClassLoader()); + G.ignite().compute().localDeployTask(TestTask2.class, TestTask2.class.getClassLoader()); + } + + /** + * @return Map. + */ + private Map<?, ?> createMap() { + Map<Object, Object> map = new HashMap<>(); + + map.put("intValue", intValue()); + map.put("stringValue", "mapString"); + map.put("simpleBean", new SimpleBean()); + map.put("complexBean", new ComplexBean(new SimpleBean(intValue(), "mapSimpleComplexString"))); + + Map<Object, Object> nested = new HashMap<>(); + + nested.put("intValue", intValue()); + nested.put("stringValue", "nestedMapString"); + nested.put("simpleBean", new SimpleBean()); + nested.put("complexBean", new ComplexBean(new SimpleBean(intValue(), "mapSimpleComplexNestedString"))); + + map.put("nestedMap", nested); + + return map; + } + + /** + * @return List. + */ + private Collection<?> createCollection() { + Collection<Object> list = new ArrayList<>(); + + list.add(intValue()); + list.add("listString"); + list.add(new Date()); + + Collection<Object> nested = new ArrayList<>(); + + nested.add(intValue()); + nested.add("nestedListString"); + nested.add(new Date()); + + list.add(nested); + + return list; + } + + /** + * Simple bean. + */ + @SuppressWarnings( {"ReturnOfDateField", "AssignmentToDateFieldFromParameter", "PublicInnerClass"}) + public static class SimpleBean implements Serializable { + /** */ + private int intField = 12345; + + /** */ + private String strField = "testString"; + + /** */ + private Date date = new Date(); + + /** + * Empty constructor. + */ + private SimpleBean() { + // No-op. + } + + /** + * @param intField Int value. + * @param strField String value. + */ + private SimpleBean(int intField, String strField) { + this.intField = intField; + this.strField = strField; + } + + /** + * @param intField Int value. + * @param strField String value. + * @param date Date value. + */ + private SimpleBean(int intField, String strField, Date date) { + this.intField = intField; + this.strField = strField; + this.date = date; + } + + /** + * @return Int value. + */ + public int getIntField() { + return intField; + } + + /** + * @param intField Int value. + */ + public void setIntField(int intField) { + this.intField = intField; + } + + /** + * @return String value. + */ + public String getStringField() { + return strField; + } + + /** + * @param strField String value. + */ + public void setStringField(String strField) { + this.strField = strField; + } + + /** + * @return Date value. + */ + public Date getDate() { + return date; + } + + /** + * @param date Date value. + */ + public void setDate(Date date) { + this.date = date; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SimpleBean.class, this); + } + } + + /** + * Simple bean. + */ + @SuppressWarnings( {"ReturnOfDateField", "PublicInnerClass"}) + public static class ComplexBean extends SimpleBean { + /** */ + private SimpleBean simpleBean = new SimpleBean(67890, "nestedTestString", new Date()); + + /** */ + private ComplexBean complexBean; + + /** + * @param simpleBean Simple bean. + */ + private ComplexBean(SimpleBean simpleBean) { + this.simpleBean = simpleBean; + } + + /** + * @return Simple bean. + */ + public SimpleBean getSimpleBean() { + return simpleBean; + } + + /** + * @param simpleBean Simple bean. + */ + public void setSimpleBean(SimpleBean simpleBean) { + this.simpleBean = simpleBean; + } + + /** + * @return Complex bean. + */ + public ComplexBean getComplexBean() { + return complexBean; + } + + /** + * @param complexBean Complex bean. + */ + public void setComplexBean(ComplexBean complexBean) { + this.complexBean = complexBean; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ComplexBean.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTaskCommandHandlerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTaskCommandHandlerSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTaskCommandHandlerSelfTest.java new file mode 100644 index 0000000..4ac738a --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTaskCommandHandlerSelfTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.client.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.handlers.task.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.client.GridClientProtocol.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test for {@code GridTaskCommandHandler} + */ +public class GridTaskCommandHandlerSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + public static final String HOST = "127.0.0.1"; + + /** */ + public static final int BINARY_PORT = 11212; + + /** */ + private static final int MAX_TASK_RESULTS = 10; + + /** */ + private GridClient client; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty(IgniteSystemProperties.GG_REST_MAX_TASK_RESULTS, String.valueOf(MAX_TASK_RESULTS)); + + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + System.clearProperty(IgniteSystemProperties.GG_REST_MAX_TASK_RESULTS); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + client = GridClientFactory.start(clientConfiguration()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridClientFactory.stop(client.id()); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost(HOST); + + assert cfg.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestTcpPort(BINARY_PORT); + + cfg.setClientConnectionConfiguration(clientCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration("replicated"), + cacheConfiguration("partitioned"), cacheConfiguration(CACHE_NAME)); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + private CacheConfiguration cacheConfiguration(@Nullable String cacheName) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setCacheMode(cacheName == null || CACHE_NAME.equals(cacheName) ? LOCAL : "replicated".equals(cacheName) ? + REPLICATED : PARTITIONED); + cfg.setName(cacheName); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * @return Client configuration. + */ + private GridClientConfiguration clientConfiguration() { + GridClientConfiguration cfg = new GridClientConfiguration(); + + GridClientDataConfiguration nullCache = new GridClientDataConfiguration(); + + GridClientDataConfiguration cache = new GridClientDataConfiguration(); + + cache.setName(CACHE_NAME); + + cfg.setDataConfigurations(Arrays.asList(nullCache, cache)); + + cfg.setProtocol(TCP); + cfg.setServers(Arrays.asList("localhost:" + BINARY_PORT)); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testManyTasksRun() throws Exception { + GridClientCompute compute = client.compute(); + + for (int i = 0; i < 1000; i++) + assertEquals("executing".length(), compute.execute(TestTask.class.getName(), "executing")); + + GridClientFactory.stop(client.id(), true); + + GridKernal g = (GridKernal)grid(0); + + Map<GridRestCommand, GridRestCommandHandler> handlers = U.field(g.context().rest(), "handlers"); + + GridTaskCommandHandler taskHnd = (GridTaskCommandHandler)F.find(handlers.values(), null, + new P1<GridRestCommandHandler>() { + @Override public boolean apply(GridRestCommandHandler e) { + return e instanceof GridTaskCommandHandler; + } + }); + + assertNotNull("GridTaskCommandHandler was not found", taskHnd); + + ConcurrentLinkedHashMap taskDesc = U.field(taskHnd, "taskDescs"); + + assertTrue("Task result map size exceeded max value [mapSize=" + taskDesc.sizex() + ", " + + "maxSize=" + MAX_TASK_RESULTS + ']', taskDesc.sizex() <= MAX_TASK_RESULTS); + } + + /** + * Test task. + */ + private static class TestTask extends ComputeTaskSplitAdapter<String, Integer> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, final String arg) throws IgniteCheckedException { + return Collections.singletonList(new ComputeJobAdapter() { + @Override public Object execute() { + try { + Thread.sleep(10); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + + return arg.length(); + } + }); + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTestBinaryClient.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTestBinaryClient.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTestBinaryClient.java new file mode 100644 index 0000000..9efdcc9 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTestBinaryClient.java @@ -0,0 +1,651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.client.marshaller.*; +import org.apache.ignite.client.marshaller.optimized.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.nio.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest.GridCacheOperation.*; + +/** + * Test client. + */ +final class GridTestBinaryClient { + /** Logger. */ + private final IgniteLogger log = new IgniteJavaLogger(); + + /** Marshaller. */ + private final GridClientMarshaller marsh = new GridClientOptimizedMarshaller(); + + /** Socket. */ + private final Socket sock; + + /** Socket input stream. */ + private final InputStream input; + + /** Opaque counter. */ + private final AtomicInteger idCntr = new AtomicInteger(0); + + /** Response queue. */ + private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(); + + /** Socket reader. */ + private final Thread rdr; + + /** Quit response. */ + private static final Response QUIT_RESP = new Response(0, GridRestResponse.STATUS_FAILED, null, null); + + /** Random client id. */ + private UUID id = UUID.randomUUID(); + + /** + * Creates client. + * + * @param host Hostname. + * @param port Port number. + * @throws IgniteCheckedException In case of error. + */ + GridTestBinaryClient(String host, int port) throws IgniteCheckedException { + assert host != null; + assert port > 0; + + try { + sock = new Socket(host, port); + + input = sock.getInputStream(); + + GridClientHandshakeRequest req = new GridClientHandshakeRequest(); + + req.marshallerId(GridClientOptimizedMarshaller.ID); + + // Write handshake. + sock.getOutputStream().write(GridClientHandshakeRequestWrapper.HANDSHAKE_HEADER); + sock.getOutputStream().write(req.rawBytes()); + + byte[] buf = new byte[1]; + + // Wait for handshake response. + int read = input.read(buf); + + assert read == 1 : read; + + assert buf[0] == GridClientHandshakeResponse.OK.resultCode() : + "Client handshake failed [code=" + buf[0] + ']'; + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to establish connection.", e); + } + + // Start socket reader thread. + rdr = new Thread(new Runnable() { + @SuppressWarnings("InfiniteLoopStatement") + @Override public void run() { + try { + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + int len = 0; + + boolean running = true; + + while (running) { + // Header. + int symbol = input.read(); + + if (symbol == -1) + break; + + if ((byte)symbol != (byte)0x90) { + if (log.isDebugEnabled()) + log.debug("Failed to parse incoming packet (invalid packet start): " + + Integer.toHexString(symbol & 0xFF)); + + break; + } + + // Packet. + while (true) { + symbol = input.read(); + + if (symbol == -1) { + running = false; + + break; + } + + byte b = (byte)symbol; + + buf.write(b); + + if (len == 0) { + if (buf.size() == 4) { + len = U.bytesToInt(buf.toByteArray(), 0); + + if (log.isInfoEnabled()) + log.info("Read length: " + len); + + buf.reset(); + } + } + else { + if (buf.size() == len) { + byte[] bytes = buf.toByteArray(); + byte[] hdrBytes = Arrays.copyOfRange(bytes, 0, 40); + byte[] msgBytes = Arrays.copyOfRange(bytes, 40, bytes.length); + + GridClientResponse msg = marsh.unmarshal(msgBytes); + + long reqId = GridClientByteUtils.bytesToLong(hdrBytes, 0); + UUID clientId = GridClientByteUtils.bytesToUuid(hdrBytes, 8); + UUID destId = GridClientByteUtils.bytesToUuid(hdrBytes, 24); + + msg.requestId(reqId); + msg.clientId(clientId); + msg.destinationId(destId); + + buf.reset(); + + len = 0; + + queue.offer(new Response(msg.requestId(), msg.successStatus(), msg.result(), + msg.errorMessage())); + + break; + } + } + } + } + } + catch (IOException e) { + if (!Thread.currentThread().isInterrupted()) + U.error(log, e); + } + finally { + U.closeQuiet(sock); + + queue.add(QUIT_RESP); + } + } + }); + + rdr.start(); + } + + /** {@inheritDoc} */ + public void shutdown() throws IgniteCheckedException { + try { + if (rdr != null) { + rdr.interrupt(); + + U.closeQuiet(sock); + + rdr.join(); + } + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Makes request to server and waits for response. + * + * @param msg Message to request, + * @return Response object. + * @throws IgniteCheckedException If request failed. + */ + private Response makeRequest(GridClientMessage msg) throws IgniteCheckedException { + assert msg != null; + + // Send request + try { + sock.getOutputStream().write(createPacket(msg)); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to send packet.", e); + } + + // Wait for response. + while (true) { + try { + // Take response from queue. + Response res = queue.take(); + + if (res == QUIT_RESP) + return res; + + // Check opaque value. + if (res.opaque() == msg.requestId()) { + if (!res.isSuccess() && res.error() != null) + throw new IgniteCheckedException(res.error()); + else + return res; + } + else + // Return response to queue if opaque is incorrect. + queue.add(res); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Interrupted while waiting for response.", e); + } + } + + } + + /** + * Creates hessian packet from client message. + * + * @param msg Message to be sent. + * @return Raw packet. + * @throws IOException If serialization failed. + */ + private byte[] createPacket(GridClientMessage msg) throws IOException { + msg.clientId(id); + + ByteBuffer res = marsh.marshal(msg, 45); + + ByteBuffer slice = res.slice(); + + slice.put((byte)0x90); + slice.putInt(res.remaining() - 5); + slice.putLong(msg.requestId()); + slice.put(U.uuidToBytes(msg.clientId())); + slice.put(U.uuidToBytes(msg.destinationId())); + + byte[] arr = new byte[res.remaining()]; + + res.get(arr); + + return arr; + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @return If value was actually put. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cachePut(@Nullable String cacheName, K key, V val) + throws IgniteCheckedException { + return cachePutAll(cacheName, Collections.singletonMap(key, val)); + } + + /** + * @param cacheName Cache name. + * @param entries Entries. + * @return {@code True} if map contained more then one entry or if put succeeded in case of one entry, + * {@code false} otherwise + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cachePutAll(@Nullable String cacheName, Map<K, V> entries) + throws IgniteCheckedException { + assert entries != null; + + GridClientCacheRequest req = new GridClientCacheRequest(PUT_ALL); + + req.requestId(idCntr.incrementAndGet()); + req.cacheName(cacheName); + req.values((Map<Object, Object>)entries); + + return makeRequest(req).<Boolean>getObject(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @return Value. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> V cacheGet(@Nullable String cacheName, K key) + throws IgniteCheckedException { + assert key != null; + + GridClientCacheRequest req = new GridClientCacheRequest(GET); + + req.requestId(idCntr.getAndIncrement()); + req.cacheName(cacheName); + req.key(key); + + return makeRequest(req).getObject(); + + } + + /** + * @param cacheName Cache name. + * @param keys Keys. + * @return Entries. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> Map<K, V> cacheGetAll(@Nullable String cacheName, K... keys) + throws IgniteCheckedException { + assert keys != null; + + GridClientCacheRequest req = new GridClientCacheRequest(GET_ALL); + + req.requestId(idCntr.getAndIncrement()); + req.cacheName(cacheName); + req.keys((Iterable<Object>)Arrays.asList(keys)); + + return makeRequest(req).getObject(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @return Whether entry was actually removed. + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("unchecked") + public <K> boolean cacheRemove(@Nullable String cacheName, K key) throws IgniteCheckedException { + assert key != null; + + GridClientCacheRequest req = new GridClientCacheRequest(RMV); + + req.requestId(idCntr.getAndIncrement()); + req.cacheName(cacheName); + req.key(key); + + return makeRequest(req).<Boolean>getObject(); + } + + /** + * @param cacheName Cache name. + * @param keys Keys. + * @return Whether entries were actually removed + * @throws IgniteCheckedException In case of error. + */ + public <K> boolean cacheRemoveAll(@Nullable String cacheName, K... keys) + throws IgniteCheckedException { + assert keys != null; + + GridClientCacheRequest req = new GridClientCacheRequest(RMV_ALL); + + req.requestId(idCntr.getAndIncrement()); + req.cacheName(cacheName); + req.keys((Iterable<Object>)Arrays.asList(keys)); + + return makeRequest(req).isSuccess(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @return Whether value was actually replaced. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cacheReplace(@Nullable String cacheName, K key, V val) + throws IgniteCheckedException { + assert key != null; + assert val != null; + + GridClientCacheRequest replace = new GridClientCacheRequest(REPLACE); + + replace.requestId(idCntr.getAndIncrement()); + replace.cacheName(cacheName); + replace.key(key); + replace.value(val); + + return makeRequest(replace).<Boolean>getObject(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val1 Value 1. + * @param val2 Value 2. + * @return Whether new value was actually set. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cacheCompareAndSet(@Nullable String cacheName, K key, @Nullable V val1, @Nullable V val2) + throws IgniteCheckedException { + assert key != null; + + GridClientCacheRequest msg = new GridClientCacheRequest(CAS); + + msg.requestId(idCntr.getAndIncrement()); + msg.cacheName(cacheName); + msg.key(key); + msg.value(val1); + msg.value2(val2); + + return makeRequest(msg).<Boolean>getObject(); + } + + /** + * @param cacheName Cache name. + * @return Metrics. + * @throws IgniteCheckedException In case of error. + */ + public <K> Map<String, Long> cacheMetrics(@Nullable String cacheName) throws IgniteCheckedException { + GridClientCacheRequest metrics = new GridClientCacheRequest(METRICS); + + metrics.requestId(idCntr.getAndIncrement()); + metrics.cacheName(cacheName); + + return makeRequest(metrics).getObject(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @return Whether entry was appended. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cacheAppend(@Nullable String cacheName, K key, V val) + throws IgniteCheckedException { + assert key != null; + assert val != null; + + GridClientCacheRequest add = new GridClientCacheRequest(APPEND); + + add.requestId(idCntr.getAndIncrement()); + add.cacheName(cacheName); + add.key(key); + add.value(val); + + return makeRequest(add).<Boolean>getObject(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @return Whether entry was prepended. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cachePrepend(@Nullable String cacheName, K key, V val) + throws IgniteCheckedException { + assert key != null; + assert val != null; + + GridClientCacheRequest add = new GridClientCacheRequest(PREPEND); + + add.requestId(idCntr.getAndIncrement()); + add.cacheName(cacheName); + add.key(key); + add.value(val); + + return makeRequest(add).<Boolean>getObject(); + } + + /** + * @param taskName Task name. + * @param arg Task arguments. + * @return Task execution result. + * @throws IgniteCheckedException In case of error. + */ + public GridClientTaskResultBean execute(String taskName, @Nullable Object arg) throws IgniteCheckedException { + assert !F.isEmpty(taskName); + + GridClientTaskRequest msg = new GridClientTaskRequest(); + + msg.taskName(taskName); + msg.argument(arg); + + return makeRequest(msg).getObject(); + } + + /** + * @param id Node ID. + * @param includeAttrs Whether to include attributes. + * @param includeMetrics Whether to include metrics. + * @return Node. + * @throws IgniteCheckedException In case of error. + */ + public GridClientNodeBean node(UUID id, boolean includeAttrs, boolean includeMetrics) + throws IgniteCheckedException { + assert id != null; + + GridClientTopologyRequest msg = new GridClientTopologyRequest(); + + msg.nodeId(id); + msg.includeAttributes(includeAttrs); + msg.includeMetrics(includeMetrics); + + return makeRequest(msg).getObject(); + } + + /** + * @param ipAddr IP address. + * @param includeAttrs Whether to include attributes. + * @param includeMetrics Whether to include metrics. + * @return Node. + * @throws IgniteCheckedException In case of error. + */ + public GridClientNodeBean node(String ipAddr, boolean includeAttrs, boolean includeMetrics) + throws IgniteCheckedException { + assert !F.isEmpty(ipAddr); + + GridClientTopologyRequest msg = new GridClientTopologyRequest(); + + msg.nodeIp(ipAddr); + msg.includeAttributes(includeAttrs); + msg.includeMetrics(includeMetrics); + + return makeRequest(msg).getObject(); + } + + /** + * @param includeAttrs Whether to include attributes. + * @param includeMetrics Whether to include metrics. + * @return Nodes. + * @throws IgniteCheckedException In case of error. + */ + public List<GridClientNodeBean> topology(boolean includeAttrs, boolean includeMetrics) + throws IgniteCheckedException { + GridClientTopologyRequest msg = new GridClientTopologyRequest(); + + msg.includeAttributes(includeAttrs); + msg.includeMetrics(includeMetrics); + + return makeRequest(msg).getObject(); + } + + /** + * @param path Log file path. + * @return Log file contents. + * @throws IgniteCheckedException In case of error. + */ + public List<String> log(@Nullable String path, int from, int to) throws IgniteCheckedException { + GridClientLogRequest msg = new GridClientLogRequest(); + + msg.requestId(idCntr.getAndIncrement()); + msg.path(path); + msg.from(from); + msg.to(to); + + return makeRequest(msg).getObject(); + } + + /** + * Response data. + */ + private static class Response { + /** Opaque. */ + private final long opaque; + + /** Success flag. */ + private final int success; + + /** Response object. */ + private final Object obj; + + /** Error message, if any */ + private final String error; + + /** + * @param opaque Opaque. + * @param success Success flag. + * @param obj Response object. + * @param error Error message, if any. + */ + Response(long opaque, int success, @Nullable Object obj, @Nullable String error) { + assert opaque >= 0; + + this.opaque = opaque; + this.success = success; + this.obj = obj; + this.error = error; + } + + /** + * @return Opaque. + */ + long opaque() { + return opaque; + } + + /** + * @return Success flag. + */ + boolean isSuccess() { + return success == GridRestResponse.STATUS_SUCCESS; + } + + /** + * @return Response object. + */ + @SuppressWarnings("unchecked") + <T> T getObject() { + return (T)obj; + } + + /** + * @return Error message in case if error occurred. + */ + String error() { + return error; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTestMemcacheClient.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTestMemcacheClient.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTestMemcacheClient.java new file mode 100644 index 0000000..67dfc5a --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/GridTestMemcacheClient.java @@ -0,0 +1,898 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest; + +import org.apache.ignite.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Test client. + */ +final class GridTestMemcacheClient { + /** Header length. */ + private static final short HDR_LEN = 24; + + /** Serialized flag. */ + private static final short SERIALIZED_FLAG = 1; + + /** Boolean flag. */ + private static final short BOOLEAN_FLAG = (1 << 8); + + /** Integer flag. */ + private static final short INT_FLAG = (2 << 8); + + /** Long flag. */ + private static final short LONG_FLAG = (3 << 8); + + /** Date flag. */ + private static final short DATE_FLAG = (4 << 8); + + /** Byte flag. */ + private static final short BYTE_FLAG = (5 << 8); + + /** Float flag. */ + private static final short FLOAT_FLAG = (6 << 8); + + /** Double flag. */ + private static final short DOUBLE_FLAG = (7 << 8); + + /** Byte array flag. */ + private static final short BYTE_ARR_FLAG = (8 << 8); + + /** Logger. */ + private final IgniteLogger log = new IgniteJavaLogger(); + + /** JDK marshaller. */ + private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller(); + + /** Socket. */ + private final Socket sock; + + /** Opaque counter. */ + private final AtomicInteger opaqueCntr = new AtomicInteger(0); + + /** Response queue. */ + private final BlockingQueue<Response> queue = + new LinkedBlockingQueue<>(); + + /** Socket reader. */ + private final Thread rdr; + + /** Quit response. */ + private static final Response QUIT_RESP = new Response(0, false, null, null); + + /** + * Creates client. + * + * @param host Hostname. + * @param port Port number. + * @throws IgniteCheckedException In case of error. + */ + GridTestMemcacheClient(String host, int port) throws IgniteCheckedException { + assert host != null; + assert port > 0; + + try { + sock = new Socket(host, port); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to establish connection.", e); + } + + // Start socket reader thread. + rdr = new Thread(new Runnable() { + @SuppressWarnings("InfiniteLoopStatement") + @Override public void run() { + try { + InputStream in = sock.getInputStream(); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + boolean running = true; + + while (running) { + byte opCode = 0; + byte extrasLength = 0; + int keyLength = 0; + boolean success = false; + int totalLength = 0; + int opaque = 0; + short keyFlags = 0; + short valFlags = 0; + Object obj = null; + Object key = null; + + int i = 0; + + while (true) { + int symbol = in.read(); + + if (symbol == -1) { + running = false; + + break; + } + + byte b = (byte)symbol; + + if (i == 1) + opCode = b; + if (i == 2 || i == 3) { + buf.write(b); + + if (i == 3) { + keyLength = U.bytesToShort(buf.toByteArray(), 0); + + buf.reset(); + } + } + else if (i == 4) + extrasLength = b; + else if (i == 6 || i == 7) { + buf.write(b); + + if (i == 7) { + success = U.bytesToShort(buf.toByteArray(), 0) == 0; + + buf.reset(); + } + } + else if (i >= 8 && i <= 11) { + buf.write(b); + + if (i == 11) { + totalLength = U.bytesToInt(buf.toByteArray(), 0); + + buf.reset(); + } + } + else if (i >= 12 && i <= 15) { + buf.write(b); + + if (i == 15) { + opaque = U.bytesToInt(buf.toByteArray(), 0); + + buf.reset(); + } + } + else if (i >= HDR_LEN && i < HDR_LEN + extrasLength) { + buf.write(b); + + if (i == HDR_LEN + extrasLength - 1) { + byte[] rawFlags = buf.toByteArray(); + + keyFlags = U.bytesToShort(rawFlags, 0); + valFlags = U.bytesToShort(rawFlags, 2); + + buf.reset(); + } + } + else if (i >= HDR_LEN + extrasLength && i < HDR_LEN + extrasLength + keyLength) { + buf.write(b); + + if (i == HDR_LEN + extrasLength + keyLength - 1) { + key = decode(buf.toByteArray(), keyFlags); + + buf.reset(); + } + } + else if (i >= HDR_LEN + extrasLength + keyLength && i < HDR_LEN + totalLength) { + buf.write(b); + + if (opCode == 0x05 || opCode == 0x06) + valFlags = LONG_FLAG; + + if (i == HDR_LEN + totalLength - 1) { + obj = decode(buf.toByteArray(), valFlags); + + buf.reset(); + } + } + + if (i == HDR_LEN + totalLength - 1) { + queue.add(new Response(opaque, success, key, obj)); + + break; + } + + i++; + } + } + } + catch (IOException e) { + if (!Thread.currentThread().isInterrupted()) + U.error(log, e); + } + catch (Exception e) { + U.error(log, e); + } + finally { + U.closeQuiet(sock); + + queue.add(QUIT_RESP); + } + } + }); + + rdr.start(); + } + + /** {@inheritDoc} */ + public void shutdown() throws IgniteCheckedException { + try { + if (rdr != null) { + rdr.interrupt(); + + U.closeQuiet(sock); + + rdr.join(); + } + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Makes request to server and waits for response. + * + * @param cmd Command. + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @param extras Extras. + * @return Response. + * @throws IgniteCheckedException In case of error. + */ + private Response makeRequest( + Command cmd, + @Nullable String cacheName, + @Nullable Object key, + @Nullable Object val, + @Nullable Long... extras + ) throws IgniteCheckedException { + assert cmd != null; + + int opaque = opaqueCntr.getAndIncrement(); + + // Send request. + try { + sock.getOutputStream().write(createPacket(cmd, cacheName, key, val, opaque, extras)); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to send packet.", e); + } + + // Wait for response. + while (true) { + try { + // Take response from queue. + Response res = queue.take(); + + if (res == QUIT_RESP) + return res; + + // Check opaque value. + if (res.getOpaque() == opaque) { + if (!res.isSuccess() && res.getObject() != null) + throw new IgniteCheckedException((String)res.getObject()); + else + return res; + } + else + // Return response to queue if opaque is incorrect. + queue.add(res); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Interrupted while waiting for response.", e); + } + } + } + + /** + * Makes request to server and waits for response. + * + * @param cmd Command. + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @param extras Extras. + * @return Response. + * @throws IgniteCheckedException In case of error. + */ + private List<Response> makeMultiRequest( + Command cmd, + @Nullable String cacheName, + @Nullable Object key, + @Nullable Object val, + @Nullable Long... extras + ) throws IgniteCheckedException { + assert cmd != null; + + int opaque = opaqueCntr.getAndIncrement(); + + List<Response> resList = new LinkedList<>(); + + // Send request. + try { + sock.getOutputStream().write(createPacket(cmd, cacheName, key, val, opaque, extras)); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to send packet.", e); + } + + // Wait for response. + while (true) { + try { + // Take response from queue. + Response res = queue.take(); + + if (res == QUIT_RESP) + return resList; + + // Check opaque value. + if (res.getOpaque() == opaque) { + if (!res.isSuccess() && res.getObject() != null) + throw new IgniteCheckedException((String)res.getObject()); + else { + if (res.getObject() == null) + return resList; + + resList.add(res); + } + } + else + // Return response to queue if opaque is incorrect. + queue.add(res); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Interrupted while waiting for response.", e); + } + } + } + + /** + * Creates packet. + * + * @param cmd Command. + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @param opaque Opaque. + * @param extras Extras. + * @throws IgniteCheckedException In case of error. + * @return Packet. + */ + private byte[] createPacket( + Command cmd, + @Nullable String cacheName, + @Nullable Object key, + @Nullable Object val, + int opaque, + @Nullable Long[] extras + ) throws IgniteCheckedException { + assert cmd != null; + assert opaque >= 0; + + byte[] cacheNameBytes = cacheName != null ? cacheName.getBytes() : null; + + Data keyData = encode(key); + + Data valData = encode(val); + + int cacheNameLength = cacheNameBytes != null ? cacheNameBytes.length : 0; + int extrasLength = cmd.extrasLength() + cacheNameLength; + + byte[] packet = new byte[HDR_LEN + extrasLength + keyData.length() + valData.length()]; + + packet[0] = (byte)0x80; + packet[1] = cmd.operationCode(); + + U.shortToBytes((short)keyData.length(), packet, 2); + + packet[4] = (byte)(extrasLength); + + U.intToBytes(extrasLength + keyData.length() + valData.length(), packet, 8); + U.intToBytes(opaque, packet, 12); + + if (extrasLength > 0) { + if (extras != null) { + int offset = HDR_LEN; + + for (Long extra : extras) { + if (extra != null) + U.longToBytes(extra, packet, offset); + + offset += 8; + } + } + else { + U.shortToBytes(keyData.getFlags(), packet, HDR_LEN); + U.shortToBytes(valData.getFlags(), packet, HDR_LEN + 2); + } + } + + if (cacheNameBytes != null) + U.arrayCopy(cacheNameBytes, 0, packet, HDR_LEN + cmd.extrasLength(), cacheNameLength); + + if (keyData.getBytes() != null) + U.arrayCopy(keyData.getBytes(), 0, packet, HDR_LEN + extrasLength, keyData.length()); + + if (valData.getBytes() != null) + U.arrayCopy(valData.getBytes(), 0, packet, HDR_LEN + extrasLength + keyData.length(), valData.length()); + + return packet; + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @return If value was actually put. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cachePut(@Nullable String cacheName, K key, V val) + throws IgniteCheckedException { + assert key != null; + assert val != null; + + return makeRequest(Command.PUT, cacheName, key, val).isSuccess(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @return Value. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> V cacheGet(@Nullable String cacheName, K key) + throws IgniteCheckedException { + assert key != null; + + return makeRequest(Command.GET, cacheName, key, null).getObject(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @return Whether entry was actually removed. + * @throws IgniteCheckedException In case of error. + */ + public <K> boolean cacheRemove(@Nullable String cacheName, K key) throws IgniteCheckedException { + assert key != null; + + return makeRequest(Command.REMOVE, cacheName, key, null).isSuccess(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @return Whether entry was added. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cacheAdd(@Nullable String cacheName, K key, V val) + throws IgniteCheckedException { + assert key != null; + assert val != null; + + return makeRequest(Command.ADD, cacheName, key, val).isSuccess(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value. + * @return Whether value was actually replaced. + * @throws IgniteCheckedException In case of error. + */ + public <K, V> boolean cacheReplace(@Nullable String cacheName, K key, V val) + throws IgniteCheckedException { + assert key != null; + assert val != null; + + return makeRequest(Command.REPLACE, cacheName, key, val).isSuccess(); + } + + /** + * @param cacheName Cache name. + * @throws IgniteCheckedException In case of error. + */ + public <K> Map<String, Long> cacheMetrics(@Nullable String cacheName) throws IgniteCheckedException { + List<Response> raw = makeMultiRequest(Command.CACHE_METRICS, cacheName, null, null); + + Map<String, Long> res = new HashMap<>(raw.size()); + + for (Response resp : raw) + res.put((String)resp.key(), Long.parseLong(String.valueOf(resp.getObject()))); + + return res; + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param init Initial value (optional). + * @param incr Amount to add. + * @return New value. + * @throws IgniteCheckedException In case of error. + */ + public <K> long cacheIncrement(@Nullable String cacheName, K key, @Nullable Long init, long incr) + throws IgniteCheckedException { + assert key != null; + + return makeRequest(Command.INCREMENT, cacheName, key, null, incr, init).<Long>getObject(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param init Initial value (optional). + * @param decr Amount to subtract. + * @return New value. + * @throws IgniteCheckedException In case of error. + */ + public <K> long cacheDecrement(@Nullable String cacheName, K key, @Nullable Long init, long decr) + throws IgniteCheckedException { + assert key != null; + + return makeRequest(Command.DECREMENT, cacheName, key, null, decr, init).<Long>getObject(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value to append. + * @return Whether operation succeeded. + * @throws IgniteCheckedException In case of error. + */ + public <K> boolean cacheAppend(@Nullable String cacheName, K key, String val) + throws IgniteCheckedException { + assert key != null; + assert val != null; + + return makeRequest(Command.APPEND, cacheName, key, val).isSuccess(); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param val Value to prepend. + * @return Whether operation succeeded. + * @throws IgniteCheckedException In case of error. + */ + public <K> boolean cachePrepend(@Nullable String cacheName, K key, String val) + throws IgniteCheckedException { + assert key != null; + assert val != null; + + return makeRequest(Command.PREPEND, cacheName, key, val).isSuccess(); + } + + /** + * @return Version. + * @throws IgniteCheckedException In case of error. + */ + public String version() throws IgniteCheckedException { + return makeRequest(Command.VERSION, null, null, null).getObject(); + } + + /** + * @throws IgniteCheckedException In case of error. + */ + public void noop() throws IgniteCheckedException { + Response res = makeRequest(Command.NOOP, null, null, null); + + assert res != null; + assert res.isSuccess(); + assert res.getObject() == null; + } + + /** + * @throws IgniteCheckedException In case of error. + */ + public void quit() throws IgniteCheckedException { + makeRequest(Command.QUIT, null, null, null); + + assert sock.isClosed(); + } + + /** + * Encodes object. + * + * @param obj Object. + * @return Encoded data. + * @throws IgniteCheckedException In case of error. + */ + public Data encode(@Nullable Object obj) throws IgniteCheckedException { + if (obj == null) + return new Data(null, (short)0); + + byte[] bytes; + short flags = 0; + + if (obj instanceof String) + bytes = ((String)obj).getBytes(); + else if (obj instanceof Boolean) { + bytes = new byte[] {(byte)((Boolean)obj ? '1' : '0')}; + + flags |= BOOLEAN_FLAG; + } + else if (obj instanceof Integer) { + bytes = U.intToBytes((Integer)obj); + + flags |= INT_FLAG; + } + else if (obj instanceof Long) { + bytes = U.longToBytes((Long)obj); + + flags |= LONG_FLAG; + } + else if (obj instanceof Date) { + bytes = U.longToBytes(((Date)obj).getTime()); + + flags |= DATE_FLAG; + } + else if (obj instanceof Byte) { + bytes = new byte[] {(Byte)obj}; + + flags |= BYTE_FLAG; + } + else if (obj instanceof Float) { + bytes = U.intToBytes(Float.floatToIntBits((Float)obj)); + + flags |= FLOAT_FLAG; + } + else if (obj instanceof Double) { + bytes = U.longToBytes(Double.doubleToLongBits((Double)obj)); + + flags |= DOUBLE_FLAG; + } + else if (obj instanceof byte[]) { + bytes = (byte[])obj; + + flags |= BYTE_ARR_FLAG; + } + else { + bytes = jdkMarshaller.marshal(obj); + + flags |= SERIALIZED_FLAG; + } + + return new Data(bytes, flags); + } + + /** + * @param bytes Byte array to decode. + * @param flags Flags. + * @return Decoded value. + * @throws IgniteCheckedException In case of error. + */ + public Object decode(byte[] bytes, short flags) throws IgniteCheckedException { + assert bytes != null; + assert flags >= 0; + + if ((flags & SERIALIZED_FLAG) != 0) + return jdkMarshaller.unmarshal(bytes, getClass().getClassLoader()); + + int masked = flags & 0xff00; + + switch (masked) { + case BOOLEAN_FLAG: + return bytes[0] == '1'; + case INT_FLAG: + return U.bytesToInt(bytes, 0); + case LONG_FLAG: + return U.bytesToLong(bytes, 0); + case DATE_FLAG: + return new Date(U.bytesToLong(bytes, 0)); + case BYTE_FLAG: + return bytes[0]; + case FLOAT_FLAG: + return Float.intBitsToFloat(U.bytesToInt(bytes, 0)); + case DOUBLE_FLAG: + return Double.longBitsToDouble(U.bytesToLong(bytes, 0)); + case BYTE_ARR_FLAG: + return bytes; + default: + return new String(bytes); + } + } + + /** + * Response data. + */ + private static class Response { + /** Opaque. */ + private final int opaque; + + /** Success flag. */ + private final boolean success; + + /** Key. */ + private final Object key; + + /** Response object. */ + private final Object obj; + + /** + * @param opaque Opaque. + * @param success Success flag. + * @param key Key object. + * @param obj Response object. + */ + Response(int opaque, boolean success, @Nullable Object key, @Nullable Object obj) { + assert opaque >= 0; + + this.opaque = opaque; + this.success = success; + this.key = key; + this.obj = obj; + } + + /** + * @return Opaque. + */ + int getOpaque() { + return opaque; + } + + /** + * @return Success flag. + */ + boolean isSuccess() { + return success; + } + + Object key() { + return key; + } + + /** + * @return Response object. + */ + @SuppressWarnings("unchecked") + <T> T getObject() { + return (T)obj; + } + } + + + private static class Data { + /** Bytes. */ + private final byte[] bytes; + + /** Flags. */ + private final short flags; + + /** + * @param bytes Bytes. + * @param flags Flags. + */ + Data(@Nullable byte[] bytes, short flags) { + assert flags >= 0; + + this.bytes = bytes; + this.flags = flags; + } + + /** + * @return Bytes. + */ + @Nullable public byte[] getBytes() { + return bytes; + } + + /** + * @return Flags. + */ + public short getFlags() { + return flags; + } + + /** + * @return Length. + */ + public int length() { + return bytes != null ? bytes.length : 0; + } + } + + /** + * Command. + */ + private enum Command { + /** Get. */ + GET((byte)0x00, 4), + + /** Put. */ + PUT((byte)0x01, 8), + + /** Add. */ + ADD((byte)0x02, 8), + + /** Replace. */ + REPLACE((byte)0x03, 8), + + /** Remove. */ + REMOVE((byte)0x04, 4), + + /** Increment. */ + INCREMENT((byte)0x05, 20), + + /** Decrement. */ + DECREMENT((byte)0x06, 20), + + /** Quit. */ + QUIT((byte)0x07, 0), + + /** Cache metrics. */ + CACHE_METRICS((byte)0x10, 4), + + /** No-op. */ + NOOP((byte)0x0A, 0), + + /** Version. */ + VERSION((byte)0x0B, 0), + + /** Append. */ + APPEND((byte)0x0E, 4), + + /** Append. */ + PREPEND((byte)0x0F, 4); + + /** Operation code. */ + private final byte opCode; + + /** Extras length. */ + private final int extrasLength; + + /** + * @param opCode Operation code. + * @param extrasLength Extras length. + */ + Command(byte opCode, int extrasLength) { + this.opCode = opCode; + this.extrasLength = extrasLength; + } + + /** + * @return Operation code. + */ + public byte operationCode() { + return opCode; + } + + /** + * @return Extras length. + */ + public int extrasLength() { + return extrasLength; + } + } +}
