Repository: ignite
Updated Branches:
  refs/heads/ignite-843-rc1 4cc2dc98a -> a362b8b7a


IGNITE-1710 update employee, car with random values


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a362b8b7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a362b8b7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a362b8b7

Branch: refs/heads/ignite-843-rc1
Commit: a362b8b7a10b18a399301a08589faf913ef2497b
Parents: 4cc2dc9
Author: Andrey <[email protected]>
Authored: Tue Oct 20 11:45:07 2015 +0700
Committer: Andrey <[email protected]>
Committed: Tue Oct 20 11:45:07 2015 +0700

----------------------------------------------------------------------
 .../agent/testdrive/AgentSqlTestDrive.java      | 136 +++++++++++++++++--
 1 file changed, 123 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a362b8b7/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java
----------------------------------------------------------------------
diff --git 
a/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java
 
b/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java
index 9e387fb..9b71178 100644
--- 
a/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java
+++ 
b/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java
@@ -23,7 +23,13 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import org.apache.ignite.Ignite;
@@ -87,6 +93,12 @@ public class AgentSqlTestDrive {
     /** Departments count */
     private static final int PARK_CNT = 10;
 
+    /** Counter for threads in pool. */
+    private static final AtomicInteger THREAD_CNT = new AtomicInteger(0);
+
+    /** */
+    private static ScheduledExecutorService cachePool;
+
     /**
      * Configure cacheEmployee.
      *
@@ -291,8 +303,9 @@ public class AgentSqlTestDrive {
     /**
      * @param ignite Ignite.
      * @param name Cache name.
+     * @param range Time range in milliseconds.
      */
-    private static void populateCacheEmployee(Ignite ignite, String name) {
+    private static void populateCacheEmployee(Ignite ignite, String name, long 
range) {
         log.log(Level.FINE, "TEST-DRIVE-SQL: Start population cache: '" + name 
+ "' with data...");
 
         IgniteCache<CountryKey, Country> cacheCountry = ignite.cache(name);
@@ -311,12 +324,6 @@ public class AgentSqlTestDrive {
 
         IgniteCache<EmployeeKey, Employee> cacheEmployee = ignite.cache(name);
 
-        long off = java.sql.Date.valueOf("2007-01-01").getTime();
-
-        long end = java.sql.Date.valueOf("2016-01-01").getTime();
-
-        long diff = end - off + 1;
-
         for (int i = 0; i < EMPL_CNT; i++) {
             Integer mgrId = (i == 0 || rnd.nextBoolean()) ? null : 
rnd.nextInt(i);
 
@@ -324,7 +331,7 @@ public class AgentSqlTestDrive {
 
             cacheEmployee.put(new EmployeeKey(i),
                 new Employee(i, "first name " + (i + 1), "last name " + (i + 
1), "email " + (i + 1),
-                    "phone number " + (i + 1), new java.sql.Date(off + 
(long)(r * diff)), "job " + (i + 1),
+                    "phone number " + (i + 1), new java.sql.Date((long)(r * 
range)), "job " + (i + 1),
                     round(r * 5000, 2) , mgrId, rnd.nextInt(DEP_CNT)));
         }
 
@@ -343,16 +350,121 @@ public class AgentSqlTestDrive {
         for (int i = 0; i < PARK_CNT; i++)
             cacheParking.put(new ParkingKey(i), new Parking(i, "Parking " + (i 
+ 1)));
 
-        IgniteCache<CarKey, Car> cacheDepartment = ignite.cache(name);
+        IgniteCache<CarKey, Car> cacheCar = ignite.cache(name);
 
         for (int i = 0; i < CAR_CNT; i++)
-            cacheDepartment.put(new CarKey(i), new Car(i, 
rnd.nextInt(PARK_CNT), "Car " + (i + 1)));
+            cacheCar.put(new CarKey(i), new Car(i, rnd.nextInt(PARK_CNT), "Car 
" + (i + 1)));
 
 
         log.log(Level.FINE, "TEST-DRIVE-SQL: Finished population cache: '" + 
name + "' with data.");
     }
 
     /**
+     * Creates a thread pool that can schedule commands to run after a given 
delay, or to execute periodically.
+     *
+     * @param corePoolSize Number of threads to keep in the pool, even if they 
are idle.
+     * @param threadName Part of thread name that would be used by thread 
factory.
+     * @return Newly created scheduled thread pool.
+     */
+    private static ScheduledExecutorService newScheduledThreadPool(int 
corePoolSize, final String threadName) {
+        ScheduledExecutorService srvc = 
Executors.newScheduledThreadPool(corePoolSize, new ThreadFactory() {
+            @Override public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, String.format("%s-%d", 
threadName, THREAD_CNT.getAndIncrement()));
+
+                thread.setDaemon(true);
+
+                return thread;
+            }
+        });
+
+        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) 
srvc;
+
+        // Setting up shutdown policy.
+        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
+        return srvc;
+    }
+
+    /**
+     * Starts read and write from cache in background.
+     *
+     * @param ignite Ignite.
+     * @param n - maximum count read/write key
+     */
+    private static void startLoad(final Ignite ignite, final int n) {
+        final long diff = new java.util.Date().getTime();
+
+        populateCacheEmployee(ignite, EMPLOYEE_CACHE_NAME, diff);
+
+        populateCacheCar(ignite, CAR_CACHE_NAME);
+
+        if (cachePool != null)
+            cachePool.shutdownNow();
+
+        cachePool = newScheduledThreadPool(2, 
"test-drive-sql-load-cache-tasks");
+
+        if (cachePool != null) {
+            cachePool.scheduleWithFixedDelay(new Runnable() {
+                @Override public void run() {
+                    try {
+                        IgniteCache<EmployeeKey, Employee> cache = 
ignite.cache(EMPLOYEE_CACHE_NAME);
+
+                        if (cache != null)
+                            for (int i = 0; i < n; i++) {
+                                Integer employeeId = rnd.nextInt(EMPL_CNT);
+
+                                Integer mgrId = (i == 0 || rnd.nextBoolean()) 
? null : rnd.nextInt(employeeId);
+
+                                double r = rnd.nextDouble();
+
+                                cache.put(new EmployeeKey(employeeId),
+                                    new Employee(employeeId, "first name " + 
(i + 1), "last name " + (i + 1),
+                                        "email " + (i + 1), "phone number " + 
(i + 1),
+                                        new java.sql.Date((long)(r * diff)), 
"job " + (i + 1),
+                                        round(r * 5000, 2) , mgrId, 
rnd.nextInt(DEP_CNT)));
+
+                                if (rnd.nextBoolean())
+                                    cache.remove(new 
EmployeeKey(rnd.nextInt(EMPL_CNT)));
+                            }
+                    }
+                    catch (IllegalStateException ignored) {
+                    }
+                    catch (Throwable e) {
+                        if (!e.getMessage().contains("cache is stopped"))
+                            ignite.log().error("Cache write task execution 
error", e);
+                    }
+                }
+            }, 10, 3, TimeUnit.SECONDS);
+
+            cachePool.scheduleWithFixedDelay(new Runnable() {
+                @Override public void run() {
+                    try {
+                        IgniteCache<CarKey, Car> cache = 
ignite.cache(CAR_CACHE_NAME);
+
+                        if (cache != null)
+                            for (int i = 0; i < n; i++) {
+                                Integer carId = rnd.nextInt(CAR_CNT);
+
+                                cache.put(new CarKey(carId), new Car(carId, 
rnd.nextInt(PARK_CNT), "Car " + (i + 1)));
+
+                                if (rnd.nextBoolean())
+                                    cache.remove(new 
CarKey(rnd.nextInt(CAR_CNT)));
+                            }
+                    }
+                    catch (IllegalStateException ignored) {
+                    }
+                    catch (Throwable e) {
+                        if (!e.getMessage().contains("cache is stopped"))
+                            ignite.log().error("Cache write task execution 
error", e);
+                    }
+                }
+            }, 10, 3, TimeUnit.SECONDS);
+        }
+    }
+
+
+    /**
      * Start ignite node with cacheEmployee and populate it with data.
      */
     public static void testDrive(AgentConfiguration acfg) {
@@ -400,9 +512,7 @@ public class AgentSqlTestDrive {
 
                 log.log(Level.INFO, "TEST-DRIVE-SQL: Embedded node for sql 
test-drive successfully started");
 
-                populateCacheEmployee(ignite, EMPLOYEE_CACHE_NAME);
-
-                populateCacheCar(ignite, CAR_CACHE_NAME);
+                startLoad(ignite, 20);
             }
             catch (Exception e) {
                 log.log(Level.SEVERE, "TEST-DRIVE-SQL: Failed to start 
embedded node for sql test-drive!", e);

Reply via email to