http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoveryTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoveryTest.java index 0000000,2219f38..a917108 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoveryTest.java @@@ -1,0 -1,153 +1,153 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery; + + import org.apache.ignite.cluster.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.managers.eventstorage.*; + import org.apache.ignite.testframework.junits.spi.*; + + import javax.swing.*; + import java.io.*; + import java.util.*; + + /** + * Base discovery test class. + * @param <T> SPI implementation class. + */ + @SuppressWarnings({"JUnitAbstractTestClassNamingConvention"}) + public abstract class AbstractDiscoveryTest<T extends DiscoverySpi> extends GridSpiAbstractTest<T> { + /** */ + @SuppressWarnings({"ClassExplicitlyExtendsThread"}) + private class Pinger extends Thread { + /** */ + private final Object mux = new Object(); + + /** */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private boolean isCanceled; + + /** {@inheritDoc} */ + @SuppressWarnings({"UnusedCatchParameter"}) + @Override public void run() { + Random rnd = new Random(); + + while (isCanceled) { + try { + Collection<ClusterNode> nodes = getSpi().getRemoteNodes(); + + pingNode(UUID.randomUUID(), false); + + for (ClusterNode item : nodes) { + pingNode(item.id(), true); + } + + pingNode(UUID.randomUUID(), false); + } + catch (Exception e) { + error("Can't get SPI.", e); + } + + synchronized (mux) { + if (isCanceled) { + try { + mux.wait(getPingFrequency() * (1 + rnd.nextInt(10))); + } + catch (InterruptedException e) { + //No-op. + } + } + } + } + } + + /** + * @param nodeId Node UUID. + * @param exists Exists flag. + * @throws Exception If failed. + */ + private void pingNode(UUID nodeId, boolean exists) throws Exception { + boolean flag = getSpi().pingNode(nodeId); + + info((flag != exists ? "***Error*** " : "") + "Ping " + (exists ? "exist" : "random") + + " node [nodeId=" + nodeId + ", pingResult=" + flag + ']'); + } + + /** {@inheritDoc} */ + @Override public void interrupt() { + synchronized (mux) { + isCanceled = true; + + mux.notifyAll(); + } + + super.interrupt(); + } + } + + /** + * @return Ping frequency. + */ + public abstract long getPingFrequency(); + + /** + * @return Pinger start flag. + */ + public boolean isPingerStart() { + return true; + } + + /** */ + private class DiscoveryListener implements GridLocalEventListener { + /** {@inheritDoc} */ - @Override public void onEvent(IgniteEvent evt) { ++ @Override public void onEvent(Event evt) { + info("Discovery event [event=" + evt + ']'); + } + } + + /** + * @throws Exception If failed. + */ + public void testDiscovery() throws Exception { + GridLocalEventListener discoLsnr = new DiscoveryListener(); + + getSpiContext().addLocalEventListener(discoLsnr); + + Pinger pinger = null; + + if (isPingerStart()) { + pinger = new Pinger(); + + pinger.start(); + } + + JOptionPane.showMessageDialog(null, "Press OK to end test."); + + if (pinger != null) + pinger.interrupt(); + } + + /** {@inheritDoc} */ + @Override protected Map<String, Serializable> getNodeAttributes() { + Map<String, Serializable> attrs = new HashMap<>(1); + + attrs.put("testDiscoveryAttribute", new Date()); + + return attrs; + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java index 0000000,eb3be6d..fa6a6c9 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java @@@ -1,0 -1,691 +1,691 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery.tcp; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.*; + import org.apache.ignite.testframework.junits.common.*; + + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.IgniteEventType.*; ++import static org.apache.ignite.events.EventType.*; + + /** + * Client-based discovery tests. + */ + public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final AtomicInteger srvIdx = new AtomicInteger(); + + /** */ + private static final AtomicInteger clientIdx = new AtomicInteger(); + + /** */ + private static Collection<UUID> srvNodeIds; + + /** */ + private static Collection<UUID> clientNodeIds; + + /** */ + private static int clientsPerSrv; + + /** */ + private static CountDownLatch srvJoinedLatch; + + /** */ + private static CountDownLatch srvLeftLatch; + + /** */ + private static CountDownLatch srvFailedLatch; + + /** */ + private static CountDownLatch clientJoinedLatch; + + /** */ + private static CountDownLatch clientLeftLatch; + + /** */ + private static CountDownLatch clientFailedLatch; + + /** */ + private static CountDownLatch msgLatch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost("127.0.0.1"); + + if (gridName.startsWith("server")) { + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + } + else if (gridName.startsWith("client")) { + TcpClientDiscoverySpi disco = new TcpClientDiscoverySpi(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); + + String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()). + get((clientIdx.get() - 1) / clientsPerSrv).toString(); + + if (addr.startsWith("/")) + addr = addr.substring(1); + + ipFinder.setAddresses(Arrays.asList(addr)); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses(); + + if (!F.isEmpty(addrs)) + IP_FINDER.unregisterAddresses(addrs); + + srvIdx.set(0); + clientIdx.set(0); + + srvNodeIds = new GridConcurrentHashSet<>(); + clientNodeIds = new GridConcurrentHashSet<>(); + + clientsPerSrv = 2; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllClients(true); + stopAllServers(true); + + assert G.allGrids().isEmpty(); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeJoin() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvJoinedLatch = new CountDownLatch(3); + clientJoinedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + startClientNodes(1); + + await(srvJoinedLatch); + await(clientJoinedLatch); + + checkNodes(3, 4); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeLeave() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvLeftLatch = new CountDownLatch(3); + clientLeftLatch = new CountDownLatch(2); + + attachListeners(3, 3); + + stopGrid("client-2"); + + await(srvLeftLatch); + await(clientLeftLatch); + + checkNodes(3, 2); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeFail() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvFailedLatch = new CountDownLatch(3); + clientFailedLatch = new CountDownLatch(2); + + attachListeners(3, 3); + + failClient(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(3, 2); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeJoin() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvJoinedLatch = new CountDownLatch(3); + clientJoinedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + startServerNodes(1); + + await(srvJoinedLatch); + await(clientJoinedLatch); + + checkNodes(4, 3); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeLeave() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvLeftLatch = new CountDownLatch(2); + clientLeftLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + stopGrid("server-2"); + + await(srvLeftLatch); + await(clientLeftLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeFail() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty(); + + failServer(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnect() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + resetClientIpFinder(2); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(3); + + attachListeners(2, 3); + + failServer(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeJoinOneServer() throws Exception { + startServerNodes(1); + + srvJoinedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + startClientNodes(1); + + await(srvJoinedLatch); + + checkNodes(1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeLeaveOneServer() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + srvLeftLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + stopGrid("client-0"); + + await(srvLeftLatch); + + checkNodes(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeFailOneServer() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + failClient(0); + + await(srvFailedLatch); + + checkNodes(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testMetrics() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + attachListeners(3, 3); + + assertTrue(checkMetrics(3, 3, 0)); + + G.ignite("client-0").compute().broadcast(F.noop()); + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return checkMetrics(3, 3, 1); + } + }, 10000)); + + checkMetrics(3, 3, 1); + + G.ignite("server-0").compute().broadcast(F.noop()); + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return checkMetrics(3, 3, 2); + } + }, 10000)); + } + + /** + * @param srvCnt Number of Number of server nodes. + * @param clientCnt Number of client nodes. + * @param execJobsCnt Expected number of executed jobs. + * @return Whether metrics are correct. + */ + private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) { + for (int i = 0; i < srvCnt; i++) { + Ignite g = G.ignite("server-" + i); + + for (ClusterNode n : g.cluster().nodes()) { + if (n.metrics().getTotalExecutedJobs() != execJobsCnt) + return false; + } + } + + for (int i = 0; i < clientCnt; i++) { + Ignite g = G.ignite("client-" + i); + + for (ClusterNode n : g.cluster().nodes()) { + if (n.metrics().getTotalExecutedJobs() != execJobsCnt) + return false; + } + } + + return true; + } + + /** + * @throws Exception If failed. + */ + public void testDataExchangeFromServer() throws Exception { + testDataExchange("server-0"); + } + + /** + * @throws Exception If failed. + */ + // TODO: GG-9174 + public void _testDataExchangeFromClient() throws Exception { + testDataExchange("client-0"); + } + + /** + * @throws Exception If failed. + */ + private void testDataExchange(String masterName) throws Exception { + startServerNodes(2); + startClientNodes(2); + + checkNodes(2, 2); + + IgniteMessaging msg = grid(masterName).message(); + + UUID id = null; + + try { + id = msg.remoteListen(null, new MessageListener()); + + msgLatch = new CountDownLatch(4); + + msg.send(null, "Message 1"); + + await(msgLatch); + + startServerNodes(1); + startClientNodes(1); + + checkNodes(3, 3); + + msgLatch = new CountDownLatch(6); + + msg.send(null, "Message 2"); + + await(msgLatch); + } + finally { + if (id != null) + msg.stopRemoteListen(id); + } + } + + /** + * @param idx Index. + * @throws Exception In case of error. + */ + private void resetClientIpFinder(int idx) throws Exception { + TcpClientDiscoverySpi disco = + (TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi(); + + TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder(); + + String addr = IP_FINDER.getRegisteredAddresses().iterator().next().toString(); + + if (addr.startsWith("/")) + addr = addr.substring(1); + + ipFinder.setAddresses(Arrays.asList(addr)); + } + + /** + * @param cnt Number of nodes. + * @throws Exception In case of error. + */ + private void startServerNodes(int cnt) throws Exception { + for (int i = 0; i < cnt; i++) { + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + srvNodeIds.add(g.cluster().localNode().id()); + } + } + + /** + * @param cnt Number of nodes. + * @throws Exception In case of error. + */ + private void startClientNodes(int cnt) throws Exception { + for (int i = 0; i < cnt; i++) { + Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); + + clientNodeIds.add(g.cluster().localNode().id()); + } + } + + /** + * @param idx Index. + */ + private void failServer(int idx) { + ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); + } + + /** + * @param idx Index. + */ + private void failClient(int idx) { + ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); + } + + /** + * @param srvCnt Number of server nodes. + * @param clientCnt Number of client nodes. + */ + private void attachListeners(int srvCnt, int clientCnt) throws Exception { + if (srvJoinedLatch != null) { + for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info("Joined event fired on server: " + evt); + + srvJoinedLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + } + } + + if (srvLeftLatch != null) { + for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info("Left event fired on server: " + evt); + + srvLeftLatch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + } + } + + if (srvFailedLatch != null) { + for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info("Failed event fired on server: " + evt); + + srvFailedLatch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + } + } + + if (clientJoinedLatch != null) { + for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info("Joined event fired on client: " + evt); + + clientJoinedLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + } + } + + if (clientLeftLatch != null) { + for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info("Left event fired on client: " + evt); + + clientLeftLatch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + } + } + + if (clientFailedLatch != null) { + for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info("Failed event fired on client: " + evt); + + clientFailedLatch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + } + } + } + + /** + * @param srvCnt Number of server nodes. + * @param clientCnt Number of client nodes. + */ + private void checkNodes(int srvCnt, int clientCnt) { + for (int i = 0; i < srvCnt; i++) { + Ignite g = G.ignite("server-" + i); + + assertTrue(srvNodeIds.contains(g.cluster().localNode().id())); + + assertFalse(g.cluster().localNode().isClient()); + + checkRemoteNodes(g, srvCnt + clientCnt - 1); + } + + for (int i = 0; i < clientCnt; i++) { + Ignite g = G.ignite("client-" + i); + + assertTrue(clientNodeIds.contains(g.cluster().localNode().id())); + + assertTrue(g.cluster().localNode().isClient()); + + checkRemoteNodes(g, srvCnt + clientCnt - 1); + } + } + + /** + * @param ignite Grid. + * @param expCnt Expected nodes count. + */ + @SuppressWarnings("TypeMayBeWeakened") + private void checkRemoteNodes(Ignite ignite, int expCnt) { + Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes(); + + assertEquals(expCnt, nodes.size()); + + for (ClusterNode node : nodes) { + UUID id = node.id(); + + if (clientNodeIds.contains(id)) + assertTrue(node.isClient()); + else if (srvNodeIds.contains(id)) + assertFalse(node.isClient()); + else + assert false : "Unexpected node ID: " + id; + } + } + + /** + * @param latch Latch. + * @throws InterruptedException If interrupted. + */ + private void await(CountDownLatch latch) throws InterruptedException { + assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS)); + } + + /** + */ + private static class MessageListener implements IgniteBiPredicate<UUID, Object> { + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object msg) { + X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']'); + + msgLatch.countDown(); + + return true; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMarshallerCheckSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMarshallerCheckSelfTest.java index 0000000,39f931b..6a5a1df mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMarshallerCheckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMarshallerCheckSelfTest.java @@@ -1,0 -1,102 +1,102 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery.tcp; + + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.marshaller.jdk.*; + import org.apache.ignite.marshaller.optimized.*; + import org.apache.ignite.spi.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.junits.common.*; + + /** + * Test for {@link TcpDiscoverySpi}. + */ + public class TcpDiscoveryMarshallerCheckSelfTest extends GridCommonAbstractTest { + /** */ + private static boolean sameMarsh; + + /** */ + private static boolean flag; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setLocalHost("127.0.0.1"); + + if (flag) - cfg.setMarshaller(new IgniteJdkMarshaller()); ++ cfg.setMarshaller(new JdkMarshaller()); + else - cfg.setMarshaller(sameMarsh ? new IgniteJdkMarshaller() : new IgniteOptimizedMarshaller()); ++ cfg.setMarshaller(sameMarsh ? new JdkMarshaller() : new OptimizedMarshaller()); + + // Flip flag. + flag = !flag; + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + flag = false; + } + + /** + * @throws Exception If failed. + */ + public void testMarshallerInConsistency() throws Exception { + sameMarsh = false; + + startGrid(1); + + try { + startGrid(2); + + fail("Expected SPI exception was not thrown."); + } + catch (IgniteCheckedException e) { + Throwable ex = e.getCause().getCause(); + + assertTrue(ex instanceof IgniteSpiException); + assertTrue(ex.getMessage().contains("Local node's marshaller differs from remote node's marshaller")); + } + } + + /** + * @throws Exception If failed. + */ + public void testMarshallerConsistency() throws Exception { + sameMarsh = true; + + startGrid(1); + startGrid(2); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 0000000,75928af..a2d8276 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@@ -1,0 -1,206 +1,206 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery.tcp; + + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.junits.common.*; + + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + -import static org.apache.ignite.events.IgniteEventType.*; ++import static org.apache.ignite.events.EventType.*; + + /** + * Test for {@link TcpDiscoverySpi}. + */ + public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 5; + + /** */ + private static final int CLIENT_GRID_CNT = 5; + + /** */ + private static final ThreadLocal<Boolean> clientFlagPerThread = new ThreadLocal<>(); + + /** */ + private static volatile boolean clientFlagGlobal; + + /** + * @return Client node flag. + */ + private static boolean client() { + Boolean client = clientFlagPerThread.get(); + + return client != null ? client : clientFlagGlobal; + } + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * @throws Exception If fails. + */ + public TcpDiscoveryMultiThreadedTest() throws Exception { + super(false); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (client()) { + TcpClientDiscoverySpi spi = new TcpClientDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + } + else { + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + } + + cfg.setCacheConfiguration(); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setIncludeProperties(); + + cfg.setLocalHost("127.0.0.1"); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; + } + + /** + * @throws Exception If any error occurs. + */ + public void testMultiThreaded() throws Exception { + execute(); + } + + /** + * @throws Exception If any error occurs. + */ + public void testTopologyVersion() throws Exception { + startGridsMultiThreaded(GRID_CNT); + + long prev = 0; + + for (Ignite g : G.allGrids()) { + IgniteKernal kernal = (IgniteKernal)g; + + long ver = kernal.context().discovery().topologyVersion(); + + info("Top ver: " + ver); + + if (prev == 0) + prev = ver; + } + + info("Test finished."); + } + + /** + * @throws Exception If failed. + */ + private void execute() throws Exception { + info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); + + startGridsMultiThreaded(GRID_CNT); + + clientFlagGlobal = true; + + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + + final AtomicBoolean done = new AtomicBoolean(); + + final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT); + + IgniteInternalFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + clientFlagPerThread.set(true); + + int idx = clientIdx.getAndIncrement(); + + while (!done.get()) { + stopGrid(idx); + startGrid(idx); + } + + return null; + } + }, + CLIENT_GRID_CNT + ); + + final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>(); + + for (int i = 0; i < GRID_CNT; i++) + srvIdx.add(i); + + IgniteInternalFuture<?> fut2 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + clientFlagPerThread.set(false); + + while (!done.get()) { + int idx = srvIdx.take(); + + stopGrid(idx); + startGrid(idx); + + srvIdx.add(idx); + } + + return null; + } + }, + GRID_CNT - 1 + ); + + Thread.sleep(getTestTimeout() - 60 * 1000); + + done.set(true); + + fut1.get(); + fut2.get(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 0000000,93962bd..aaf0744 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@@ -1,0 -1,990 +1,990 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery.tcp; + + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.port.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.spi.*; + import org.apache.ignite.spi.discovery.*; + import org.apache.ignite.spi.discovery.tcp.internal.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.spi.discovery.tcp.messages.*; + import org.apache.ignite.testframework.*; + import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.IgniteEventType.*; ++import static org.apache.ignite.events.EventType.*; + import static org.apache.ignite.spi.IgnitePortProtocol.*; + + /** + * Test for {@link TcpDiscoverySpi}. + */ + public class TcpDiscoverySelfTest extends GridCommonAbstractTest { + /** */ + private TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private Map<String, TcpDiscoverySpi> discoMap = new HashMap<>(); + + /** */ + private UUID nodeId; + + /** + * @throws Exception If fails. + */ + public TcpDiscoverySelfTest() throws Exception { + super(false); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "deprecation"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi; + + if (gridName.contains("FailBeforeNodeAddedSentSpi")) + spi = new FailBeforeNodeAddedSentSpi(); + else if (gridName.contains("FailBeforeNodeLeftSentSpi")) + spi = new FailBeforeNodeLeftSentSpi(); + else + spi = new TcpDiscoverySpi(); + + discoMap.put(gridName, spi); + + spi.setIpFinder(ipFinder); + + spi.setNetworkTimeout(2500); + + spi.setHeartbeatFrequency(1000); + + spi.setMaxMissedHeartbeats(3); + + spi.setIpFinderCleanFrequency(5000); + + spi.setJoinTimeout(5000); + + cfg.setDiscoverySpi(spi); + + cfg.setCacheConfiguration(); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setIncludeProperties(); + + if (!gridName.contains("LoopbackProblemTest")) + cfg.setLocalHost("127.0.0.1"); + + if (gridName.contains("testFailureDetectionOnNodePing")) { + spi.setReconnectCount(1); // To make test faster: on Windows 1 connect takes 1 second. + spi.setHeartbeatFrequency(40000); + } + + cfg.setRestEnabled(false); + + if (nodeId != null) + cfg.setNodeId(nodeId); + + if (gridName.contains("NonSharedIpFinder")) { + TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(); + + finder.setAddresses(Arrays.asList("127.0.0.1:47501")); + + spi.setIpFinder(finder); + } + else if (gridName.contains("MulticastIpFinder")) { + TcpDiscoveryMulticastIpFinder finder = new TcpDiscoveryMulticastIpFinder(); + + finder.setAddressRequestAttempts(10); + finder.setMulticastGroup(GridTestUtils.getNextMulticastGroup(getClass())); + finder.setMulticastPort(GridTestUtils.getNextMulticastPort(getClass())); + + spi.setIpFinder(finder); + + // Loopback multicast discovery is not working on Mac OS + // (possibly due to http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7122846). + if (U.isMacOs()) + spi.setLocalAddress(F.first(U.allLocalIps())); + } + + return cfg; + } + + /** + * @throws Exception If any error occurs. + */ + public void testSingleNodeStartStop() throws Exception { + try { + startGrid(1); + } + finally { + stopGrid(1); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testThreeNodesStartStop() throws Exception { + try { + startGrid(1); + startGrid(2); + startGrid(3); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any errors occur. + */ + public void testNodeConnectMessageSize() throws Exception { + try { + Ignite g1 = startGrid(1); + + final AtomicInteger gridNameIdx = new AtomicInteger(1); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + startGrid(gridNameIdx.incrementAndGet()); + + return null; + } + }, 4, "grid-starter"); + + Collection<TcpDiscoveryNode> nodes = discoMap.get(g1.name()).ring().allNodes(); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + g1.configuration().getMarshaller().marshal(nodes, bos); + + info(">>> Approximate node connect message size [topSize=" + nodes.size() + + ", msgSize=" + bos.size() / 1024.0 + "KB]"); + } + finally { + stopAllGrids(false); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testPing() throws Exception { + try { + startGrid(1); + startGrid(2); + startGrid(3); + + info("Nodes were started"); + + for (Map.Entry<String, TcpDiscoverySpi> e : discoMap.entrySet()) { + DiscoverySpi spi = e.getValue(); + + for (Ignite g : G.allGrids()) { + boolean res = spi.pingNode(g.cluster().localNode().id()); + + assert res : e.getKey() + " failed to ping " + g.cluster().localNode().id() + " of " + g.name(); + + info(e.getKey() + " pinged " + g.cluster().localNode().id() + " of " + g.name()); + } + } + + info("All nodes pinged successfully."); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailureDetectionOnNodePing1() throws Exception { + try { + Ignite g1 = startGrid("testFailureDetectionOnNodePingCoordinator"); + startGrid("testFailureDetectionOnNodePing2"); + Ignite g3 = startGrid("testFailureDetectionOnNodePing3"); + + testFailureDetectionOnNodePing(g1, g3); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailureDetectionOnNodePing2() throws Exception { + try { + startGrid("testFailureDetectionOnNodePingCoordinator"); + Ignite g2 = startGrid("testFailureDetectionOnNodePing2"); + Ignite g3 = startGrid("testFailureDetectionOnNodePing3"); + + testFailureDetectionOnNodePing(g3, g2); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailureDetectionOnNodePing3() throws Exception { + try { + Ignite g1 = startGrid("testFailureDetectionOnNodePingCoordinator"); + Ignite g2 = startGrid("testFailureDetectionOnNodePing2"); + startGrid("testFailureDetectionOnNodePing3"); + + testFailureDetectionOnNodePing(g2, g1); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite failedNode) throws Exception { + final CountDownLatch cnt = new CountDownLatch(1); + + pingingNode.events().localListen( - new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, - IgniteEventType.EVT_NODE_FAILED ++ EventType.EVT_NODE_FAILED + ); + + info("Nodes were started"); + + discoMap.get(failedNode.name()).simulateNodeFailure(); + + TcpDiscoverySpi spi = discoMap.get(pingingNode.name()); + + boolean res = spi.pingNode(failedNode.cluster().localNode().id()); + + assertFalse("Ping is ok for node " + failedNode.cluster().localNode().id() + ", but had to fail.", res); + + // Heartbeat interval is 40 seconds, but we should detect node failure faster. + assert cnt.await(7, SECONDS); + } + + /** + * @throws Exception If any error occurs. + */ + public void testNodeAdded() throws Exception { + try { + final Ignite g1 = startGrid(1); + + final CountDownLatch cnt = new CountDownLatch(2); + + g1.events().localListen( - new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info("Node joined: " + evt.message()); + - IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; ++ DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + TcpDiscoveryNode node = ((TcpDiscoveryNode)discoMap.get(g1.name()). + getNode(discoEvt.eventNode().id())); + + assert node != null && node.visible(); + + cnt.countDown(); + + return true; + } + }, - IgniteEventType.EVT_NODE_JOINED ++ EventType.EVT_NODE_JOINED + ); + + startGrid(2); + startGrid(3); + + info("Nodes were started"); + + assert cnt.await(1, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testOrdinaryNodeLeave() throws Exception { + try { + Ignite g1 = startGrid(1); + startGrid(2); + startGrid(3); + + final CountDownLatch cnt = new CountDownLatch(2); + + g1.events().localListen( - new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, + EVT_NODE_LEFT + ); + + info("Nodes were started"); + + stopGrid(3); + stopGrid(2); + + boolean res = cnt.await(1, SECONDS); + + assert res; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testCoordinatorNodeLeave() throws Exception { + try { + startGrid(1); + Ignite g2 = startGrid(2); + + final CountDownLatch cnt = new CountDownLatch(1); + - g2.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ g2.events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + + info("Nodes were started"); + + stopGrid(1); + + assert cnt.await(1, SECONDS); + + // Start new grid, ensure that added to topology + final CountDownLatch cnt2 = new CountDownLatch(1); + - g2.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ g2.events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + cnt2.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + + startGrid(3); + + assert cnt2.await(1, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testOrdinaryNodeFailure() throws Exception { + try { + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + Ignite g3 = startGrid(3); + + final CountDownLatch cnt = new CountDownLatch(2); + + g1.events().localListen( - new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, - IgniteEventType.EVT_NODE_FAILED ++ EventType.EVT_NODE_FAILED + ); + + info("Nodes were started"); + + discoMap.get(g2.name()).simulateNodeFailure(); + discoMap.get(g3.name()).simulateNodeFailure(); + + assert cnt.await(25, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testCoordinatorNodeFailure() throws Exception { + try { + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + final CountDownLatch cnt = new CountDownLatch(1); + - g2.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ g2.events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } - }, IgniteEventType.EVT_NODE_FAILED); ++ }, EventType.EVT_NODE_FAILED); + + info("Nodes were started"); + + discoMap.get(g1.name()).simulateNodeFailure(); + + assert cnt.await(20, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testMetricsSending() throws Exception { + final AtomicBoolean stopping = new AtomicBoolean(); + + try { + final CountDownLatch latch1 = new CountDownLatch(1); + + final Ignite g1 = startGrid(1); + - IgnitePredicate<IgniteEvent> lsnr1 = new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ IgnitePredicate<Event> lsnr1 = new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info(evt.message()); + + latch1.countDown(); + + return true; + } + }; + + g1.events().localListen(lsnr1, EVT_NODE_METRICS_UPDATED); + + assert latch1.await(10, SECONDS); + + g1.events().stopLocalListen(lsnr1); + + final CountDownLatch latch1_1 = new CountDownLatch(1); + final CountDownLatch latch1_2 = new CountDownLatch(1); + final CountDownLatch latch2_1 = new CountDownLatch(1); + final CountDownLatch latch2_2 = new CountDownLatch(1); + + final Ignite g2 = startGrid(2); + + g2.events().localListen( - new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + if (stopping.get()) + return true; + + info(evt.message()); + - UUID id = ((IgniteDiscoveryEvent) evt).eventNode().id(); ++ UUID id = ((DiscoveryEvent) evt).eventNode().id(); + + if (id.equals(g1.cluster().localNode().id())) + latch2_1.countDown(); + else if (id.equals(g2.cluster().localNode().id())) + latch2_2.countDown(); + else + assert false : "Event fired for unknown node."; + + return true; + } + }, + EVT_NODE_METRICS_UPDATED + ); + - g1.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ g1.events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + if (stopping.get()) + return true; + + info(evt.message()); + - UUID id = ((IgniteDiscoveryEvent) evt).eventNode().id(); ++ UUID id = ((DiscoveryEvent) evt).eventNode().id(); + + if (id.equals(g1.cluster().localNode().id())) + latch1_1.countDown(); + else if (id.equals(g2.cluster().localNode().id())) + latch1_2.countDown(); + else + assert false : "Event fired for unknown node."; + + return true; + } + }, EVT_NODE_METRICS_UPDATED); + + assert latch1_1.await(10, SECONDS); + assert latch1_2.await(10, SECONDS); + assert latch2_1.await(10, SECONDS); + assert latch2_2.await(10, SECONDS); + } + finally { + stopping.set(true); + + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailBeforeNodeAddedSent() throws Exception { + try { + Ignite g1 = startGrid(1); + + final CountDownLatch joinCnt = new CountDownLatch(2); + final CountDownLatch failCnt = new CountDownLatch(1); + - g1.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ g1.events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + if (evt.type() == EVT_NODE_JOINED) + joinCnt.countDown(); + else if (evt.type() == EVT_NODE_FAILED) + failCnt.countDown(); + else + assert false : "Unexpected event type: " + evt; + + return true; + } + }, EVT_NODE_JOINED, EVT_NODE_FAILED); + + startGrid("FailBeforeNodeAddedSentSpi"); + + startGrid(3); + + assert joinCnt.await(10, SECONDS); + assert failCnt.await(10, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailBeforeNodeLeftSent() throws Exception { + try { + startGrid(1); + startGrid(2); + + startGrid("FailBeforeNodeLeftSentSpi"); + + Ignite g3 = startGrid(3); + + final CountDownLatch cnt = new CountDownLatch(1); + - g3.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ g3.events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + + stopGrid(1); + + assert cnt.await(20, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testIpFinderCleaning() throws Exception { + try { + ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024), + new InetSocketAddress("host2", 1024))); + + Ignite g1 = startGrid(1); + + long timeout = (long)(discoMap.get(g1.name()).getIpFinderCleanFrequency() * 1.5); + + Thread.sleep(timeout); + + assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses(); + + // Check that missing addresses are returned back. + ipFinder.unregisterAddresses(ipFinder.getRegisteredAddresses()); // Unregister valid address. + + ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024), + new InetSocketAddress("host2", 1024))); + + Thread.sleep(timeout); + + assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses(); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testNonSharedIpFinder() throws Exception { + try { + GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.sleep(4000); + + return startGrid("NonSharedIpFinder-2"); + } + }, 1, "grid-starter"); + + // This node should wait until any node "from ipFinder" appears, see log messages. + Ignite g = startGrid("NonSharedIpFinder-1"); + + assert g.cluster().localNode().order() == 2; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testMulticastIpFinder() throws Exception { + try { + for (int i = 0; i < 5; i++) { + Ignite g = startGrid("MulticastIpFinder-" + i); + + assertEquals(i + 1, g.cluster().nodes().size()); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)g.configuration().getDiscoverySpi(); + + TcpDiscoveryMulticastIpFinder ipFinder = (TcpDiscoveryMulticastIpFinder)spi.getIpFinder(); + + boolean found = false; + + for (GridPortRecord rec : ((IgniteKernal) g).context().ports().records()) { + if ((rec.protocol() == UDP) && rec.port() == ipFinder.getMulticastPort()) { + found = true; + + break; + } + } + + assertTrue("GridTcpDiscoveryMulticastIpFinder should register port." , found); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testInvalidAddressIpFinder() throws Exception { + ipFinder.setShared(false); + + ipFinder.setAddresses(Collections.singletonList("some-host")); + + try { + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + startGrid(1); + + return null; + } + }, + IgniteCheckedException.class, + null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testJoinTimeout() throws Exception { + try { + // This start will fail as expected. + Throwable t = GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + startGrid("NonSharedIpFinder-1"); + + return null; + } + }, IgniteCheckedException.class, null); + + assert X.hasCause(t, IgniteSpiException.class) : "Unexpected exception: " + t; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDirtyIpFinder() throws Exception { + try { + // Dirty IP finder + for (int i = 47500; i < 47520; i++) + ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("127.0.0.1", i), + new InetSocketAddress("unknown-host", i))); + + assert ipFinder.isShared(); + + startGrid(1); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testDuplicateId() throws Exception { + try { + // Random ID. + startGrid(1); + + nodeId = UUID.randomUUID(); + + startGrid(2); + + // Duplicate ID. + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + // Exception will be thrown and output to log. + startGrid(3); + + return null; + } + }, + IgniteCheckedException.class, + null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testLoopbackProblemFirstNodeOnLoopback() throws Exception { + // On Windows and Mac machines two nodes can reside on the same port + // (if one node has localHost="127.0.0.1" and another has localHost="0.0.0.0"). + // So two nodes do not even discover each other. + if (U.isWindows() || U.isMacOs()) + return; + + try { + startGridNoOptimize(1); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + // Exception will be thrown because we start node which does not use loopback address, + // but the first node does. + startGridNoOptimize("LoopbackProblemTest"); + + return null; + } + }, + IgniteException.class, + null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testLoopbackProblemSecondNodeOnLoopback() throws Exception { + if (U.isWindows() || U.isMacOs()) + return; + + try { + startGridNoOptimize("LoopbackProblemTest"); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + // Exception will be thrown because we start node which uses loopback address, + // but the first node does not. + startGridNoOptimize(1); + + return null; + } + }, + IgniteException.class, + null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testGridStartTime() throws Exception { + try { + startGridsMultiThreaded(5); + + Long startTime = null; + + IgniteKernal firstGrid = null; + + Collection<IgniteKernal> grids = new ArrayList<>(); + + for (int i = 0; i < 5 ; i++) { + IgniteKernal grid = (IgniteKernal)grid(i); + + assertTrue(grid.context().discovery().gridStartTime() > 0); + + if (i > 0) + assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); + else + startTime = grid.context().discovery().gridStartTime(); + + if (grid.localNode().order() == 1) + firstGrid = grid; + else + grids.add(grid); + } + + assertNotNull(firstGrid); + + stopGrid(firstGrid.name()); + + for (IgniteKernal grid : grids) + assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); + + grids.add((IgniteKernal) startGrid(5)); + + for (IgniteKernal grid : grids) + assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); + } + finally { + stopAllGrids(); + } + } + + /** + * Starts new grid with given index. Method optimize is not invoked. + * + * @param idx Index of the grid to start. + * @return Started grid. + * @throws Exception If anything failed. + */ + private Ignite startGridNoOptimize(int idx) throws Exception { + return startGridNoOptimize(getTestGridName(idx)); + } + + /** + * Starts new grid with given name. Method optimize is not invoked. + * + * @param gridName Grid name. + * @return Started grid. + * @throws Exception If failed. + */ + private Ignite startGridNoOptimize(String gridName) throws Exception { + return G.start(getConfiguration(gridName)); + } + + /** + * + */ + private static class FailBeforeNodeAddedSentSpi extends TcpDiscoverySpi { + /** */ + private int i; + + /** {@inheritDoc} */ + @Override void onBeforeMessageSentAcrossRing(Serializable msg) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) + if (++i == 2) { + simulateNodeFailure(); + + throw new RuntimeException("Avoid message sending: " + msg.getClass()); + } + } + } + + /** + * + */ + private static class FailBeforeNodeLeftSentSpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override void onBeforeMessageSentAcrossRing(Serializable msg) { + if (msg instanceof TcpDiscoveryNodeLeftMessage) { + simulateNodeFailure(); + + throw new RuntimeException("Avoid message sending: " + msg.getClass()); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java index 0000000,4857567..b5b8c63 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java @@@ -1,0 -1,125 +1,125 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery.tcp.ipfinder; + + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.testframework.junits.common.*; + + import java.lang.reflect.*; + import java.net.*; + import java.util.*; + + /** + * Abstract test for ip finder. + */ + public abstract class TcpDiscoveryIpFinderAbstractSelfTest<T extends TcpDiscoveryIpFinder> + extends GridCommonAbstractTest { + /** */ + private T finder; + + /** + * Constructor. + * + * @throws Exception If any error occurs. + */ + @SuppressWarnings({"AbstractMethodCallInConstructor", "OverriddenMethodCallDuringObjectConstruction"}) + protected TcpDiscoveryIpFinderAbstractSelfTest() throws Exception { + super(false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + finder = ipFinder(); + + injectLogger(finder); + } + + /** + * @throws Exception If any error occurs. + */ + public void testIpFinder() throws Exception { + finder.initializeLocalAddresses(Arrays.asList(new InetSocketAddress(InetAddress.getLocalHost(), 1000))); + + InetSocketAddress node1 = new InetSocketAddress(InetAddress.getLocalHost(), 1000); + InetSocketAddress node2 = new InetSocketAddress(InetAddress.getLocalHost(), 1001); + + List<InetSocketAddress> initAddrs = Arrays.asList(node1, node2); + + finder.registerAddresses(Collections.singletonList(node1)); + + finder.registerAddresses(initAddrs); + + Collection<InetSocketAddress> addrs = finder.getRegisteredAddresses(); + + for (int i = 0; i < 5 && addrs.size() != 2; i++) { + U.sleep(1000); + + addrs = finder.getRegisteredAddresses(); + } + + assertEquals("Wrong collection size", 2, addrs.size()); + + for (InetSocketAddress addr : initAddrs) + assert addrs.contains(addr) : "Address is missing (got inconsistent addrs collection): " + addr; + + finder.unregisterAddresses(Collections.singletonList(node1)); + + addrs = finder.getRegisteredAddresses(); + + for (int i = 0; i < 5 && addrs.size() != 1; i++) { + U.sleep(1000); + + addrs = finder.getRegisteredAddresses(); + } + + assertEquals("Wrong collection size", 1, addrs.size()); + + finder.unregisterAddresses(finder.getRegisteredAddresses()); + + finder.close(); + } + + /** + * @param finder IP finder. + * @throws IllegalAccessException If any error occurs. + */ + protected void injectLogger(T finder) throws IllegalAccessException { + assert finder != null; + + for (Class cls = finder.getClass(); cls != Object.class; cls = cls.getSuperclass()) + for (Field fld : cls.getDeclaredFields()) - if (fld.getAnnotation(IgniteLoggerResource.class) != null) { ++ if (fld.getAnnotation(LoggerResource.class) != null) { + boolean accessible = fld.isAccessible(); + + fld.setAccessible(true); + + fld.set(finder, log); + + fld.setAccessible(accessible); + } + } + + /** + * Creates and initializes ip finder. + * + * @return IP finder. + * @throws Exception If any error occurs. + */ + protected abstract T ipFinder() throws Exception; + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/spi/swapspace/GridSwapSpaceSpiAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/startup/GridRandomCommandLineLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/startup/cmdline/GridCommandLineLoaderTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/testframework/junits/logger/GridTestLog4jLogger.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/testframework/junits/logger/GridTestLog4jLogger.java index 6203f2d,f6f27cf..098f713 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/logger/GridTestLog4jLogger.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/logger/GridTestLog4jLogger.java @@@ -60,11 -60,11 +60,11 @@@ import static org.apache.ignite.IgniteS * Please take a look at <a target=_new href="http://logging.apache.org/log4j/1.2/index.html>Apache Log4j 1.2</a> * for additional information. * <p> - * It's recommended to use GridGain logger injection instead of using/instantiating + * It's recommended to use Ignite logger injection instead of using/instantiating - * logger in your task/job code. See {@link org.apache.ignite.resources.IgniteLoggerResource} annotation about logger + * logger in your task/job code. See {@link org.apache.ignite.resources.LoggerResource} annotation about logger * injection. */ -public class GridTestLog4jLogger implements IgniteLogger, IgniteLoggerNodeIdAware { +public class GridTestLog4jLogger implements IgniteLogger, LoggerNodeIdAware { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteFsTestSuite.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/core/src/test/webapp/META-INF/gg-config.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/hadoop/src/main/java/org/apache/ignite/ignitefs/hadoop/GridGgfsHadoopParameters.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/main/java/org/apache/ignite/ignitefs/hadoop/GridGgfsHadoopParameters.java index 67bd754,0000000..fa385b9 mode 100644,000000..100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/ignitefs/hadoop/GridGgfsHadoopParameters.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/ignitefs/hadoop/GridGgfsHadoopParameters.java @@@ -1,94 -1,0 +1,94 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.hadoop; + +/** + * This class lists parameters that can be specified in Hadoop configuration. + * Hadoop configuration can be specified in {@code core-site.xml} file + * or passed to map-reduce task directly when using Hadoop driver for GGFS file system: + * <ul> + * <li> + * {@code fs.ggfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides + * the one specified in {@link org.apache.ignite.configuration.IgniteFsConfiguration#getSequentialReadsBeforePrefetch()} + * GGFS data node configuration property. + * </li> + * <li> + * {@code fs.ggfs.[name].log.enabled} - specifies whether GGFS sampling logger is enabled. If + * {@code true}, then all file system operations will be logged to a file. + * </li> + * <li>{@code fs.ggfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li> + * <li> + * {@code fs.ggfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before + * it gets flushed to log file. Higher values will imply greater performance, but will increase delay + * before record appears in the log file. + * </li> + * <li> + * {@code fs.ggfs.[name].colocated.writes} - specifies whether written files should be colocated on data + * node to which client is connected. If {@code true}, file will not be distributed and will be written + * to a single data node. Default value is {@code true}. + * </li> + * <li> + * {@code fs.ggfs.prefer.local.writes} - specifies whether file preferably should be written to + * local data node if it has enough free space. After some time it can be redistributed across nodes though. + * </li> + * </ul> + * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in + * case your file system URI is {@code ggfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}. + * <p> + * Sample configuration that can be placed to {@code core-site.xml} file: + * <pre name="code" class="xml"> + * <property> + * <name>fs.ggfs.127.0.0.1:10500.log.enabled</name> + * <value>true</value> + * </property> + * <property> + * <name>fs.ggfs.127.0.0.1:10500.log.dir</name> - * <value>/home/gridgain/log/sampling</value> ++ * <value>/home/apache/ignite/log/sampling</value> + * </property> + * <property> + * <name>fs.ggfs.127.0.0.1:10500.log.batch_size</name> + * <value>16</value> + * </property> + * </pre> + * Parameters could also be specified per mapreduce job, e.g. + * <pre name="code" class="bash"> + * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.ggfs.open.sequential_reads_before_prefetch=4 + * </pre> + * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest + * way to do that is {@code String.format(PARAM_GGFS_COLOCATED_WRITES, [name])}. + */ +public class GridGgfsHadoopParameters { + /** Parameter name for control over file colocation write mode. */ + public static final String PARAM_GGFS_COLOCATED_WRITES = "fs.ggfs.%s.colocated.writes"; + + /** Parameter name for custom sequential reads before prefetch value. */ + public static final String PARAM_GGFS_SEQ_READS_BEFORE_PREFETCH = + "fs.ggfs.%s.open.sequential_reads_before_prefetch"; + + /** Parameter name for client logger directory. */ + public static final String PARAM_GGFS_LOG_DIR = "fs.ggfs.%s.log.dir"; + + /** Parameter name for log batch size. */ + public static final String PARAM_GGFS_LOG_BATCH_SIZE = "fs.ggfs.%s.log.batch_size"; + + /** Parameter name for log enabled flag. */ + public static final String PARAM_GGFS_LOG_ENABLED = "fs.ggfs.%s.log.enabled"; + + /** Parameter name for prefer local writes flag. */ + public static final String PARAM_GGFS_PREFER_LOCAL_WRITES = "fs.ggfs.prefer.local.writes"; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/hadoop/src/main/java/org/apache/ignite/ignitefs/hadoop/package.html ---------------------------------------------------------------------- diff --cc modules/hadoop/src/main/java/org/apache/ignite/ignitefs/hadoop/package.html index 2b3efe2,0000000..137055b mode 100644,000000..100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/ignitefs/hadoop/package.html +++ b/modules/hadoop/src/main/java/org/apache/ignite/ignitefs/hadoop/package.html @@@ -1,23 -1,0 +1,24 @@@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. - --> ++--> ++ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains common files for Hadoop 1.x and Hadoop 2.x distros. +</body> +</html>
